001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.broker;
018
019import java.util.Collection;
020import java.util.Collections;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.commons.logging.Log;
024
025import org.springframework.context.ApplicationEventPublisher;
026import org.springframework.context.ApplicationEventPublisherAware;
027import org.springframework.context.SmartLifecycle;
028import org.springframework.lang.Nullable;
029import org.springframework.messaging.Message;
030import org.springframework.messaging.MessageChannel;
031import org.springframework.messaging.MessageHandler;
032import org.springframework.messaging.SubscribableChannel;
033import org.springframework.messaging.simp.SimpLogging;
034import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
035import org.springframework.messaging.simp.SimpMessageType;
036import org.springframework.messaging.support.ChannelInterceptor;
037import org.springframework.messaging.support.InterceptableChannel;
038import org.springframework.util.Assert;
039import org.springframework.util.CollectionUtils;
040
041/**
042 * Abstract base class for a {@link MessageHandler} that broker messages to
043 * registered subscribers.
044 *
045 * @author Rossen Stoyanchev
046 * @since 4.0
047 */
048public abstract class AbstractBrokerMessageHandler
049                implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
050
051        protected final Log logger = SimpLogging.forLogName(getClass());
052
053        private final SubscribableChannel clientInboundChannel;
054
055        private final MessageChannel clientOutboundChannel;
056
057        private final SubscribableChannel brokerChannel;
058
059        private final Collection<String> destinationPrefixes;
060
061        private boolean preservePublishOrder = false;
062
063        @Nullable
064        private ApplicationEventPublisher eventPublisher;
065
066        private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
067
068        private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this);
069
070        private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this);
071
072        private boolean autoStartup = true;
073
074        private volatile boolean running = false;
075
076        private final Object lifecycleMonitor = new Object();
077
078        private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor();
079
080
081        /**
082         * Constructor with no destination prefixes (matches all destinations).
083         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
084         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
085         * @param brokerChannel the channel for the application to send messages to the broker
086         */
087        public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
088                        SubscribableChannel brokerChannel) {
089
090                this(inboundChannel, outboundChannel, brokerChannel, Collections.emptyList());
091        }
092
093        /**
094         * Constructor with destination prefixes to match to destinations of messages.
095         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
096         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
097         * @param brokerChannel the channel for the application to send messages to the broker
098         * @param destinationPrefixes prefixes to use to filter out messages
099         */
100        public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
101                        SubscribableChannel brokerChannel, @Nullable Collection<String> destinationPrefixes) {
102
103                Assert.notNull(inboundChannel, "'inboundChannel' must not be null");
104                Assert.notNull(outboundChannel, "'outboundChannel' must not be null");
105                Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
106
107                this.clientInboundChannel = inboundChannel;
108                this.clientOutboundChannel = outboundChannel;
109                this.brokerChannel = brokerChannel;
110
111                destinationPrefixes = (destinationPrefixes != null ? destinationPrefixes : Collections.emptyList());
112                this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
113        }
114
115
116        public SubscribableChannel getClientInboundChannel() {
117                return this.clientInboundChannel;
118        }
119
120        public MessageChannel getClientOutboundChannel() {
121                return this.clientOutboundChannel;
122        }
123
124        public SubscribableChannel getBrokerChannel() {
125                return this.brokerChannel;
126        }
127
128        public Collection<String> getDestinationPrefixes() {
129                return this.destinationPrefixes;
130        }
131
132        /**
133         * Whether the client must receive messages in the order of publication.
134         * <p>By default messages sent to the {@code "clientOutboundChannel"} may
135         * not be processed in the same order because the channel is backed by a
136         * ThreadPoolExecutor that in turn does not guarantee processing in order.
137         * <p>When this flag is set to {@code true} messages within the same session
138         * will be sent to the {@code "clientOutboundChannel"} one at a time in
139         * order to preserve the order of publication. Enable this only if needed
140         * since there is some performance overhead to keep messages in order.
141         * @param preservePublishOrder whether to publish in order
142         * @since 5.1
143         */
144        public void setPreservePublishOrder(boolean preservePublishOrder) {
145                OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder);
146                this.preservePublishOrder = preservePublishOrder;
147        }
148
149        /**
150         * Whether to ensure messages are received in the order of publication.
151         * @since 5.1
152         */
153        public boolean isPreservePublishOrder() {
154                return this.preservePublishOrder;
155        }
156
157        @Override
158        public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher publisher) {
159                this.eventPublisher = publisher;
160        }
161
162        @Nullable
163        public ApplicationEventPublisher getApplicationEventPublisher() {
164                return this.eventPublisher;
165        }
166
167        public void setAutoStartup(boolean autoStartup) {
168                this.autoStartup = autoStartup;
169        }
170
171        @Override
172        public boolean isAutoStartup() {
173                return this.autoStartup;
174        }
175
176
177        @Override
178        public void start() {
179                synchronized (this.lifecycleMonitor) {
180                        logger.info("Starting...");
181                        this.clientInboundChannel.subscribe(this);
182                        this.brokerChannel.subscribe(this);
183                        if (this.clientInboundChannel instanceof InterceptableChannel) {
184                                ((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
185                        }
186                        startInternal();
187                        this.running = true;
188                        logger.info("Started.");
189                }
190        }
191
192        protected void startInternal() {
193        }
194
195        @Override
196        public void stop() {
197                synchronized (this.lifecycleMonitor) {
198                        logger.info("Stopping...");
199                        stopInternal();
200                        this.clientInboundChannel.unsubscribe(this);
201                        this.brokerChannel.unsubscribe(this);
202                        if (this.clientInboundChannel instanceof InterceptableChannel) {
203                                ((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
204                        }
205                        this.running = false;
206                        logger.info("Stopped.");
207                }
208        }
209
210        protected void stopInternal() {
211        }
212
213        @Override
214        public final void stop(Runnable callback) {
215                synchronized (this.lifecycleMonitor) {
216                        stop();
217                        callback.run();
218                }
219        }
220
221        /**
222         * Check whether this message handler is currently running.
223         * <p>Note that even when this message handler is running the
224         * {@link #isBrokerAvailable()} flag may still independently alternate between
225         * being on and off depending on the concrete sub-class implementation.
226         */
227        @Override
228        public final boolean isRunning() {
229                return this.running;
230        }
231
232        /**
233         * Whether the message broker is currently available and able to process messages.
234         * <p>Note that this is in addition to the {@link #isRunning()} flag, which
235         * indicates whether this message handler is running. In other words the message
236         * handler must first be running and then the {@code #isBrokerAvailable()} flag
237         * may still independently alternate between being on and off depending on the
238         * concrete sub-class implementation.
239         * <p>Application components may implement
240         * {@code org.springframework.context.ApplicationListener&lt;BrokerAvailabilityEvent&gt;}
241         * to receive notifications when broker becomes available and unavailable.
242         */
243        public boolean isBrokerAvailable() {
244                return this.brokerAvailable.get();
245        }
246
247
248        @Override
249        public void handleMessage(Message<?> message) {
250                if (!this.running) {
251                        if (logger.isTraceEnabled()) {
252                                logger.trace(this + " not running yet. Ignoring " + message);
253                        }
254                        return;
255                }
256                handleMessageInternal(message);
257        }
258
259        protected abstract void handleMessageInternal(Message<?> message);
260
261
262        protected boolean checkDestinationPrefix(@Nullable String destination) {
263                if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) {
264                        return true;
265                }
266                for (String prefix : this.destinationPrefixes) {
267                        if (destination.startsWith(prefix)) {
268                                return true;
269                        }
270                }
271                return false;
272        }
273
274        protected void publishBrokerAvailableEvent() {
275                boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
276                if (this.eventPublisher != null && shouldPublish) {
277                        if (logger.isInfoEnabled()) {
278                                logger.info(this.availableEvent);
279                        }
280                        this.eventPublisher.publishEvent(this.availableEvent);
281                }
282        }
283
284        protected void publishBrokerUnavailableEvent() {
285                boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
286                if (this.eventPublisher != null && shouldPublish) {
287                        if (logger.isInfoEnabled()) {
288                                logger.info(this.notAvailableEvent);
289                        }
290                        this.eventPublisher.publishEvent(this.notAvailableEvent);
291                }
292        }
293
294        /**
295         * Get the MessageChannel to use for sending messages to clients, possibly
296         * a per-session wrapper when {@code preservePublishOrder=true}.
297         * @since 5.1
298         */
299        protected MessageChannel getClientOutboundChannelForSession(String sessionId) {
300                return this.preservePublishOrder ?
301                                new OrderedMessageSender(getClientOutboundChannel(), logger) : getClientOutboundChannel();
302        }
303
304
305        /**
306         * Detect unsent DISCONNECT messages and process them anyway.
307         */
308        private class UnsentDisconnectChannelInterceptor implements ChannelInterceptor {
309
310                @Override
311                public void afterSendCompletion(
312                                Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
313
314                        if (!sent) {
315                                SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
316                                if (SimpMessageType.DISCONNECT.equals(messageType)) {
317                                        logger.debug("Detected unsent DISCONNECT message. Processing anyway.");
318                                        handleMessage(message);
319                                }
320                        }
321                }
322        }
323
324}