001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.util.Arrays;
020import java.util.List;
021
022import org.apache.commons.logging.Log;
023
024import org.springframework.context.SmartLifecycle;
025import org.springframework.lang.Nullable;
026import org.springframework.messaging.Message;
027import org.springframework.messaging.MessageHandler;
028import org.springframework.messaging.MessageHeaders;
029import org.springframework.messaging.MessagingException;
030import org.springframework.messaging.SubscribableChannel;
031import org.springframework.messaging.core.MessageSendingOperations;
032import org.springframework.messaging.simp.SimpLogging;
033import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
034import org.springframework.messaging.simp.SimpMessageType;
035import org.springframework.messaging.simp.SimpMessagingTemplate;
036import org.springframework.messaging.support.MessageBuilder;
037import org.springframework.messaging.support.MessageHeaderInitializer;
038import org.springframework.util.Assert;
039import org.springframework.util.StringUtils;
040
041/**
042 * {@code MessageHandler} with support for "user" destinations.
043 *
044 * <p>Listens for messages with "user" destinations, translates their destination
045 * to actual target destinations unique to the active session(s) of a user, and
046 * then sends the resolved messages to the broker channel to be delivered.
047 *
048 * @author Rossen Stoyanchev
049 * @since 4.0
050 */
051public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle {
052
053        private static final Log logger = SimpLogging.forLogName(UserDestinationMessageHandler.class);
054
055
056        private final SubscribableChannel clientInboundChannel;
057
058        private final SubscribableChannel brokerChannel;
059
060        private final UserDestinationResolver destinationResolver;
061
062        private final MessageSendingOperations<String> messagingTemplate;
063
064        @Nullable
065        private BroadcastHandler broadcastHandler;
066
067        @Nullable
068        private MessageHeaderInitializer headerInitializer;
069
070        private volatile boolean running = false;
071
072        private final Object lifecycleMonitor = new Object();
073
074
075        /**
076         * Create an instance with the given client and broker channels subscribing
077         * to handle messages from each and then sending any resolved messages to the
078         * broker channel.
079         * @param clientInboundChannel messages received from clients.
080         * @param brokerChannel messages sent to the broker.
081         * @param resolver the resolver for "user" destinations.
082         */
083        public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel,
084                        SubscribableChannel brokerChannel, UserDestinationResolver resolver) {
085
086                Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null");
087                Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
088                Assert.notNull(resolver, "resolver must not be null");
089
090                this.clientInboundChannel = clientInboundChannel;
091                this.brokerChannel = brokerChannel;
092                this.messagingTemplate = new SimpMessagingTemplate(brokerChannel);
093                this.destinationResolver = resolver;
094        }
095
096
097        /**
098         * Return the configured {@link UserDestinationResolver}.
099         */
100        public UserDestinationResolver getUserDestinationResolver() {
101                return this.destinationResolver;
102        }
103
104        /**
105         * Set a destination to broadcast messages to that remain unresolved because
106         * the user is not connected. In a multi-application server scenario this
107         * gives other application servers a chance to try.
108         * <p>By default this is not set.
109         * @param destination the target destination.
110         */
111        public void setBroadcastDestination(@Nullable String destination) {
112                this.broadcastHandler = (StringUtils.hasText(destination) ?
113                                new BroadcastHandler(this.messagingTemplate, destination) : null);
114        }
115
116        /**
117         * Return the configured destination for unresolved messages.
118         */
119        @Nullable
120        public String getBroadcastDestination() {
121                return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null);
122        }
123
124        /**
125         * Return the messaging template used to send resolved messages to the
126         * broker channel.
127         */
128        public MessageSendingOperations<String> getBrokerMessagingTemplate() {
129                return this.messagingTemplate;
130        }
131
132        /**
133         * Configure a custom {@link MessageHeaderInitializer} to initialize the
134         * headers of resolved target messages.
135         * <p>By default this is not set.
136         */
137        public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) {
138                this.headerInitializer = headerInitializer;
139        }
140
141        /**
142         * Return the configured header initializer.
143         */
144        @Nullable
145        public MessageHeaderInitializer getHeaderInitializer() {
146                return this.headerInitializer;
147        }
148
149
150        @Override
151        public final void start() {
152                synchronized (this.lifecycleMonitor) {
153                        this.clientInboundChannel.subscribe(this);
154                        this.brokerChannel.subscribe(this);
155                        this.running = true;
156                }
157        }
158
159        @Override
160        public final void stop() {
161                synchronized (this.lifecycleMonitor) {
162                        this.running = false;
163                        this.clientInboundChannel.unsubscribe(this);
164                        this.brokerChannel.unsubscribe(this);
165                }
166        }
167
168        @Override
169        public final void stop(Runnable callback) {
170                synchronized (this.lifecycleMonitor) {
171                        stop();
172                        callback.run();
173                }
174        }
175
176        @Override
177        public final boolean isRunning() {
178                return this.running;
179        }
180
181
182        @Override
183        public void handleMessage(Message<?> message) throws MessagingException {
184                Message<?> messageToUse = message;
185                if (this.broadcastHandler != null) {
186                        messageToUse = this.broadcastHandler.preHandle(message);
187                        if (messageToUse == null) {
188                                return;
189                        }
190                }
191
192                UserDestinationResult result = this.destinationResolver.resolveDestination(messageToUse);
193                if (result == null) {
194                        return;
195                }
196
197                if (result.getTargetDestinations().isEmpty()) {
198                        if (logger.isTraceEnabled()) {
199                                logger.trace("No active sessions for user destination: " + result.getSourceDestination());
200                        }
201                        if (this.broadcastHandler != null) {
202                                this.broadcastHandler.handleUnresolved(messageToUse);
203                        }
204                        return;
205                }
206
207                SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(messageToUse);
208                initHeaders(accessor);
209                accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
210                accessor.setLeaveMutable(true);
211
212                messageToUse = MessageBuilder.createMessage(messageToUse.getPayload(), accessor.getMessageHeaders());
213                if (logger.isTraceEnabled()) {
214                        logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
215                }
216                for (String target : result.getTargetDestinations()) {
217                        this.messagingTemplate.send(target, messageToUse);
218                }
219        }
220
221        private void initHeaders(SimpMessageHeaderAccessor headerAccessor) {
222                if (getHeaderInitializer() != null) {
223                        getHeaderInitializer().initHeaders(headerAccessor);
224                }
225        }
226
227        @Override
228        public String toString() {
229                return "UserDestinationMessageHandler[" + this.destinationResolver + "]";
230        }
231
232
233        /**
234         * A handler that broadcasts locally unresolved messages to the broker and
235         * also handles similar broadcasts received from the broker.
236         */
237        private static class BroadcastHandler {
238
239                private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
240
241                private final MessageSendingOperations<String> messagingTemplate;
242
243                private final String broadcastDestination;
244
245                public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
246                        this.messagingTemplate = template;
247                        this.broadcastDestination = destination;
248                }
249
250                public String getBroadcastDestination() {
251                        return this.broadcastDestination;
252                }
253
254                @Nullable
255                public Message<?> preHandle(Message<?> message) throws MessagingException {
256                        String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
257                        if (!getBroadcastDestination().equals(destination)) {
258                                return message;
259                        }
260                        SimpMessageHeaderAccessor accessor =
261                                        SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
262                        Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
263                        if (accessor.getSessionId() == null) {
264                                // Our own broadcast
265                                return null;
266                        }
267                        destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
268                        if (logger.isTraceEnabled()) {
269                                logger.trace("Checking unresolved user destination: " + destination);
270                        }
271                        SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
272                        for (String name : accessor.toNativeHeaderMap().keySet()) {
273                                if (NO_COPY_LIST.contains(name)) {
274                                        continue;
275                                }
276                                newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
277                        }
278                        if (destination != null) {
279                                newAccessor.setDestination(destination);
280                        }
281                        newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
282                        return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
283                }
284
285                public void handleUnresolved(Message<?> message) {
286                        MessageHeaders headers = message.getHeaders();
287                        if (SimpMessageHeaderAccessor.getFirstNativeHeader(
288                                        SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) {
289                                // Re-broadcast
290                                return;
291                        }
292                        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
293                        String destination = accessor.getDestination();
294                        accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination);
295                        accessor.setLeaveMutable(true);
296                        message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
297                        if (logger.isTraceEnabled()) {
298                                logger.trace("Translated " + destination + " -> " + getBroadcastDestination());
299                        }
300                        this.messagingTemplate.send(getBroadcastDestination(), message);
301                }
302        }
303
304}