001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.broker; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.context.ApplicationEventPublisher; 027import org.springframework.context.ApplicationEventPublisherAware; 028import org.springframework.context.SmartLifecycle; 029import org.springframework.messaging.Message; 030import org.springframework.messaging.MessageChannel; 031import org.springframework.messaging.MessageHandler; 032import org.springframework.messaging.SubscribableChannel; 033import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 034import org.springframework.messaging.simp.SimpMessageType; 035import org.springframework.messaging.support.ChannelInterceptor; 036import org.springframework.messaging.support.ChannelInterceptorAdapter; 037import org.springframework.messaging.support.InterceptableChannel; 038import org.springframework.util.Assert; 039import org.springframework.util.CollectionUtils; 040 041/** 042 * Abstract base class for a {@link MessageHandler} that broker messages to 043 * registered subscribers. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.0 047 */ 048public abstract class AbstractBrokerMessageHandler 049 implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle { 050 051 protected final Log logger = LogFactory.getLog(getClass()); 052 053 private final SubscribableChannel clientInboundChannel; 054 055 private final MessageChannel clientOutboundChannel; 056 057 private final SubscribableChannel brokerChannel; 058 059 private final Collection<String> destinationPrefixes; 060 061 private ApplicationEventPublisher eventPublisher; 062 063 private AtomicBoolean brokerAvailable = new AtomicBoolean(false); 064 065 private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this); 066 067 private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this); 068 069 private boolean autoStartup = true; 070 071 private volatile boolean running = false; 072 073 private final Object lifecycleMonitor = new Object(); 074 075 private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor(); 076 077 078 /** 079 * Constructor with no destination prefixes (matches all destinations). 080 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 081 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 082 * @param brokerChannel the channel for the application to send messages to the broker 083 */ 084 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 085 SubscribableChannel brokerChannel) { 086 087 this(inboundChannel, outboundChannel, brokerChannel, Collections.<String>emptyList()); 088 } 089 090 /** 091 * Constructor with destination prefixes to match to destinations of messages. 092 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 093 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 094 * @param brokerChannel the channel for the application to send messages to the broker 095 * @param destinationPrefixes prefixes to use to filter out messages 096 */ 097 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 098 SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) { 099 100 Assert.notNull(inboundChannel, "'inboundChannel' must not be null"); 101 Assert.notNull(outboundChannel, "'outboundChannel' must not be null"); 102 Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); 103 104 this.clientInboundChannel = inboundChannel; 105 this.clientOutboundChannel = outboundChannel; 106 this.brokerChannel = brokerChannel; 107 108 destinationPrefixes = (destinationPrefixes != null ? destinationPrefixes : Collections.<String>emptyList()); 109 this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes); 110 } 111 112 113 public SubscribableChannel getClientInboundChannel() { 114 return this.clientInboundChannel; 115 } 116 117 public MessageChannel getClientOutboundChannel() { 118 return this.clientOutboundChannel; 119 } 120 121 public SubscribableChannel getBrokerChannel() { 122 return this.brokerChannel; 123 } 124 125 public Collection<String> getDestinationPrefixes() { 126 return this.destinationPrefixes; 127 } 128 129 @Override 130 public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { 131 this.eventPublisher = publisher; 132 } 133 134 public ApplicationEventPublisher getApplicationEventPublisher() { 135 return this.eventPublisher; 136 } 137 138 public void setAutoStartup(boolean autoStartup) { 139 this.autoStartup = autoStartup; 140 } 141 142 @Override 143 public boolean isAutoStartup() { 144 return this.autoStartup; 145 } 146 147 @Override 148 public int getPhase() { 149 return Integer.MAX_VALUE; 150 } 151 152 153 @Override 154 public void start() { 155 synchronized (this.lifecycleMonitor) { 156 logger.info("Starting..."); 157 this.clientInboundChannel.subscribe(this); 158 this.brokerChannel.subscribe(this); 159 if (this.clientInboundChannel instanceof InterceptableChannel) { 160 ((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor); 161 } 162 startInternal(); 163 this.running = true; 164 logger.info("Started."); 165 } 166 } 167 168 protected void startInternal() { 169 } 170 171 @Override 172 public void stop() { 173 synchronized (this.lifecycleMonitor) { 174 logger.info("Stopping..."); 175 stopInternal(); 176 this.clientInboundChannel.unsubscribe(this); 177 this.brokerChannel.unsubscribe(this); 178 if (this.clientInboundChannel instanceof InterceptableChannel) { 179 ((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor); 180 } 181 this.running = false; 182 logger.info("Stopped."); 183 } 184 } 185 186 protected void stopInternal() { 187 } 188 189 @Override 190 public final void stop(Runnable callback) { 191 synchronized (this.lifecycleMonitor) { 192 stop(); 193 callback.run(); 194 } 195 } 196 197 /** 198 * Check whether this message handler is currently running. 199 * <p>Note that even when this message handler is running the 200 * {@link #isBrokerAvailable()} flag may still independently alternate between 201 * being on and off depending on the concrete sub-class implementation. 202 */ 203 @Override 204 public final boolean isRunning() { 205 return this.running; 206 } 207 208 /** 209 * Whether the message broker is currently available and able to process messages. 210 * <p>Note that this is in addition to the {@link #isRunning()} flag, which 211 * indicates whether this message handler is running. In other words the message 212 * handler must first be running and then the {@code #isBrokerAvailable()} flag 213 * may still independently alternate between being on and off depending on the 214 * concrete sub-class implementation. 215 * <p>Application components may implement 216 * {@code org.springframework.context.ApplicationListener<BrokerAvailabilityEvent>} 217 * to receive notifications when broker becomes available and unavailable. 218 */ 219 public boolean isBrokerAvailable() { 220 return this.brokerAvailable.get(); 221 } 222 223 224 @Override 225 public void handleMessage(Message<?> message) { 226 if (!this.running) { 227 if (logger.isTraceEnabled()) { 228 logger.trace(this + " not running yet. Ignoring " + message); 229 } 230 return; 231 } 232 handleMessageInternal(message); 233 } 234 235 protected abstract void handleMessageInternal(Message<?> message); 236 237 238 protected boolean checkDestinationPrefix(String destination) { 239 if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) { 240 return true; 241 } 242 for (String prefix : this.destinationPrefixes) { 243 if (destination.startsWith(prefix)) { 244 return true; 245 } 246 } 247 return false; 248 } 249 250 protected void publishBrokerAvailableEvent() { 251 boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); 252 if (this.eventPublisher != null && shouldPublish) { 253 if (logger.isInfoEnabled()) { 254 logger.info(this.availableEvent); 255 } 256 this.eventPublisher.publishEvent(this.availableEvent); 257 } 258 } 259 260 protected void publishBrokerUnavailableEvent() { 261 boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); 262 if (this.eventPublisher != null && shouldPublish) { 263 if (logger.isInfoEnabled()) { 264 logger.info(this.notAvailableEvent); 265 } 266 this.eventPublisher.publishEvent(this.notAvailableEvent); 267 } 268 } 269 270 271 /** 272 * Detect unsent DISCONNECT messages and process them anyway. 273 */ 274 private class UnsentDisconnectChannelInterceptor extends ChannelInterceptorAdapter { 275 276 @Override 277 public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { 278 if (!sent) { 279 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); 280 if (SimpMessageType.DISCONNECT.equals(messageType)) { 281 logger.debug("Detected unsent DISCONNECT message. Processing anyway."); 282 handleMessage(message); 283 } 284 } 285 } 286 } 287 288}