001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.broker;
018
019import java.util.Collection;
020import java.util.Collections;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.context.ApplicationEventPublisher;
027import org.springframework.context.ApplicationEventPublisherAware;
028import org.springframework.context.SmartLifecycle;
029import org.springframework.messaging.Message;
030import org.springframework.messaging.MessageChannel;
031import org.springframework.messaging.MessageHandler;
032import org.springframework.messaging.SubscribableChannel;
033import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
034import org.springframework.messaging.simp.SimpMessageType;
035import org.springframework.messaging.support.ChannelInterceptor;
036import org.springframework.messaging.support.ChannelInterceptorAdapter;
037import org.springframework.messaging.support.InterceptableChannel;
038import org.springframework.util.Assert;
039import org.springframework.util.CollectionUtils;
040
041/**
042 * Abstract base class for a {@link MessageHandler} that broker messages to
043 * registered subscribers.
044 *
045 * @author Rossen Stoyanchev
046 * @since 4.0
047 */
048public abstract class AbstractBrokerMessageHandler
049                implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
050
051        protected final Log logger = LogFactory.getLog(getClass());
052
053        private final SubscribableChannel clientInboundChannel;
054
055        private final MessageChannel clientOutboundChannel;
056
057        private final SubscribableChannel brokerChannel;
058
059        private final Collection<String> destinationPrefixes;
060
061        private ApplicationEventPublisher eventPublisher;
062
063        private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
064
065        private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this);
066
067        private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this);
068
069        private boolean autoStartup = true;
070
071        private volatile boolean running = false;
072
073        private final Object lifecycleMonitor = new Object();
074
075        private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor();
076
077
078        /**
079         * Constructor with no destination prefixes (matches all destinations).
080         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
081         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
082         * @param brokerChannel the channel for the application to send messages to the broker
083         */
084        public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
085                        SubscribableChannel brokerChannel) {
086
087                this(inboundChannel, outboundChannel, brokerChannel, Collections.<String>emptyList());
088        }
089
090        /**
091         * Constructor with destination prefixes to match to destinations of messages.
092         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
093         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
094         * @param brokerChannel the channel for the application to send messages to the broker
095         * @param destinationPrefixes prefixes to use to filter out messages
096         */
097        public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
098                        SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
099
100                Assert.notNull(inboundChannel, "'inboundChannel' must not be null");
101                Assert.notNull(outboundChannel, "'outboundChannel' must not be null");
102                Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
103
104                this.clientInboundChannel = inboundChannel;
105                this.clientOutboundChannel = outboundChannel;
106                this.brokerChannel = brokerChannel;
107
108                destinationPrefixes = (destinationPrefixes != null ? destinationPrefixes : Collections.<String>emptyList());
109                this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
110        }
111
112
113        public SubscribableChannel getClientInboundChannel() {
114                return this.clientInboundChannel;
115        }
116
117        public MessageChannel getClientOutboundChannel() {
118                return this.clientOutboundChannel;
119        }
120
121        public SubscribableChannel getBrokerChannel() {
122                return this.brokerChannel;
123        }
124
125        public Collection<String> getDestinationPrefixes() {
126                return this.destinationPrefixes;
127        }
128
129        @Override
130        public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
131                this.eventPublisher = publisher;
132        }
133
134        public ApplicationEventPublisher getApplicationEventPublisher() {
135                return this.eventPublisher;
136        }
137
138        public void setAutoStartup(boolean autoStartup) {
139                this.autoStartup = autoStartup;
140        }
141
142        @Override
143        public boolean isAutoStartup() {
144                return this.autoStartup;
145        }
146
147        @Override
148        public int getPhase() {
149                return Integer.MAX_VALUE;
150        }
151
152
153        @Override
154        public void start() {
155                synchronized (this.lifecycleMonitor) {
156                        logger.info("Starting...");
157                        this.clientInboundChannel.subscribe(this);
158                        this.brokerChannel.subscribe(this);
159                        if (this.clientInboundChannel instanceof InterceptableChannel) {
160                                ((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
161                        }
162                        startInternal();
163                        this.running = true;
164                        logger.info("Started.");
165                }
166        }
167
168        protected void startInternal() {
169        }
170
171        @Override
172        public void stop() {
173                synchronized (this.lifecycleMonitor) {
174                        logger.info("Stopping...");
175                        stopInternal();
176                        this.clientInboundChannel.unsubscribe(this);
177                        this.brokerChannel.unsubscribe(this);
178                        if (this.clientInboundChannel instanceof InterceptableChannel) {
179                                ((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
180                        }
181                        this.running = false;
182                        logger.info("Stopped.");
183                }
184        }
185
186        protected void stopInternal() {
187        }
188
189        @Override
190        public final void stop(Runnable callback) {
191                synchronized (this.lifecycleMonitor) {
192                        stop();
193                        callback.run();
194                }
195        }
196
197        /**
198         * Check whether this message handler is currently running.
199         * <p>Note that even when this message handler is running the
200         * {@link #isBrokerAvailable()} flag may still independently alternate between
201         * being on and off depending on the concrete sub-class implementation.
202         */
203        @Override
204        public final boolean isRunning() {
205                return this.running;
206        }
207
208        /**
209         * Whether the message broker is currently available and able to process messages.
210         * <p>Note that this is in addition to the {@link #isRunning()} flag, which
211         * indicates whether this message handler is running. In other words the message
212         * handler must first be running and then the {@code #isBrokerAvailable()} flag
213         * may still independently alternate between being on and off depending on the
214         * concrete sub-class implementation.
215         * <p>Application components may implement
216         * {@code org.springframework.context.ApplicationListener&lt;BrokerAvailabilityEvent&gt;}
217         * to receive notifications when broker becomes available and unavailable.
218         */
219        public boolean isBrokerAvailable() {
220                return this.brokerAvailable.get();
221        }
222
223
224        @Override
225        public void handleMessage(Message<?> message) {
226                if (!this.running) {
227                        if (logger.isTraceEnabled()) {
228                                logger.trace(this + " not running yet. Ignoring " + message);
229                        }
230                        return;
231                }
232                handleMessageInternal(message);
233        }
234
235        protected abstract void handleMessageInternal(Message<?> message);
236
237
238        protected boolean checkDestinationPrefix(String destination) {
239                if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) {
240                        return true;
241                }
242                for (String prefix : this.destinationPrefixes) {
243                        if (destination.startsWith(prefix)) {
244                                return true;
245                        }
246                }
247                return false;
248        }
249
250        protected void publishBrokerAvailableEvent() {
251                boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
252                if (this.eventPublisher != null && shouldPublish) {
253                        if (logger.isInfoEnabled()) {
254                                logger.info(this.availableEvent);
255                        }
256                        this.eventPublisher.publishEvent(this.availableEvent);
257                }
258        }
259
260        protected void publishBrokerUnavailableEvent() {
261                boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
262                if (this.eventPublisher != null && shouldPublish) {
263                        if (logger.isInfoEnabled()) {
264                                logger.info(this.notAvailableEvent);
265                        }
266                        this.eventPublisher.publishEvent(this.notAvailableEvent);
267                }
268        }
269
270
271        /**
272         * Detect unsent DISCONNECT messages and process them anyway.
273         */
274        private class UnsentDisconnectChannelInterceptor extends ChannelInterceptorAdapter {
275
276                @Override
277                public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
278                        if (!sent) {
279                                SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
280                                if (SimpMessageType.DISCONNECT.equals(messageType)) {
281                                        logger.debug("Detected unsent DISCONNECT message. Processing anyway.");
282                                        handleMessage(message);
283                                }
284                        }
285                }
286        }
287
288}