001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.util.Arrays;
020import java.util.List;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024
025import org.springframework.context.SmartLifecycle;
026import org.springframework.messaging.Message;
027import org.springframework.messaging.MessageHandler;
028import org.springframework.messaging.MessageHeaders;
029import org.springframework.messaging.MessagingException;
030import org.springframework.messaging.SubscribableChannel;
031import org.springframework.messaging.core.MessageSendingOperations;
032import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
033import org.springframework.messaging.simp.SimpMessageType;
034import org.springframework.messaging.simp.SimpMessagingTemplate;
035import org.springframework.messaging.support.MessageBuilder;
036import org.springframework.messaging.support.MessageHeaderInitializer;
037import org.springframework.util.Assert;
038import org.springframework.util.StringUtils;
039
040/**
041 * {@code MessageHandler} with support for "user" destinations.
042 *
043 * <p>Listens for messages with "user" destinations, translates their destination
044 * to actual target destinations unique to the active session(s) of a user, and
045 * then sends the resolved messages to the broker channel to be delivered.
046 *
047 * @author Rossen Stoyanchev
048 * @since 4.0
049 */
050public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle {
051
052        private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
053
054
055        private final SubscribableChannel clientInboundChannel;
056
057        private final SubscribableChannel brokerChannel;
058
059        private final UserDestinationResolver destinationResolver;
060
061        private final MessageSendingOperations<String> messagingTemplate;
062
063        private BroadcastHandler broadcastHandler;
064
065        private MessageHeaderInitializer headerInitializer;
066
067        private volatile boolean running = false;
068
069        private final Object lifecycleMonitor = new Object();
070
071
072        /**
073         * Create an instance with the given client and broker channels subscribing
074         * to handle messages from each and then sending any resolved messages to the
075         * broker channel.
076         * @param clientInboundChannel messages received from clients.
077         * @param brokerChannel messages sent to the broker.
078         * @param resolver the resolver for "user" destinations.
079         */
080        public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel,
081                        SubscribableChannel brokerChannel, UserDestinationResolver resolver) {
082
083                Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null");
084                Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
085                Assert.notNull(resolver, "resolver must not be null");
086
087                this.clientInboundChannel = clientInboundChannel;
088                this.brokerChannel = brokerChannel;
089                this.messagingTemplate = new SimpMessagingTemplate(brokerChannel);
090                this.destinationResolver = resolver;
091        }
092
093
094        /**
095         * Return the configured {@link UserDestinationResolver}.
096         */
097        public UserDestinationResolver getUserDestinationResolver() {
098                return this.destinationResolver;
099        }
100
101        /**
102         * Set a destination to broadcast messages to that remain unresolved because
103         * the user is not connected. In a multi-application server scenario this
104         * gives other application servers a chance to try.
105         * <p>By default this is not set.
106         * @param destination the target destination.
107         */
108        public void setBroadcastDestination(String destination) {
109                this.broadcastHandler = (StringUtils.hasText(destination) ?
110                                new BroadcastHandler(this.messagingTemplate, destination) : null);
111        }
112
113        /**
114         * Return the configured destination for unresolved messages.
115         */
116        public String getBroadcastDestination() {
117                return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null);
118        }
119
120        /**
121         * Return the messaging template used to send resolved messages to the
122         * broker channel.
123         */
124        public MessageSendingOperations<String> getBrokerMessagingTemplate() {
125                return this.messagingTemplate;
126        }
127
128        /**
129         * Configure a custom {@link MessageHeaderInitializer} to initialize the
130         * headers of resolved target messages.
131         * <p>By default this is not set.
132         */
133        public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
134                this.headerInitializer = headerInitializer;
135        }
136
137        /**
138         * Return the configured header initializer.
139         */
140        public MessageHeaderInitializer getHeaderInitializer() {
141                return this.headerInitializer;
142        }
143
144
145        @Override
146        public boolean isAutoStartup() {
147                return true;
148        }
149
150        @Override
151        public int getPhase() {
152                return Integer.MAX_VALUE;
153        }
154
155        @Override
156        public final void start() {
157                synchronized (this.lifecycleMonitor) {
158                        this.clientInboundChannel.subscribe(this);
159                        this.brokerChannel.subscribe(this);
160                        this.running = true;
161                }
162        }
163
164        @Override
165        public final void stop() {
166                synchronized (this.lifecycleMonitor) {
167                        this.running = false;
168                        this.clientInboundChannel.unsubscribe(this);
169                        this.brokerChannel.unsubscribe(this);
170                }
171        }
172
173        @Override
174        public final void stop(Runnable callback) {
175                synchronized (this.lifecycleMonitor) {
176                        stop();
177                        callback.run();
178                }
179        }
180
181        @Override
182        public final boolean isRunning() {
183                return this.running;
184        }
185
186
187        @Override
188        public void handleMessage(Message<?> message) throws MessagingException {
189                if (this.broadcastHandler != null) {
190                        message = this.broadcastHandler.preHandle(message);
191                        if (message == null) {
192                                return;
193                        }
194                }
195                UserDestinationResult result = this.destinationResolver.resolveDestination(message);
196                if (result == null) {
197                        return;
198                }
199                if (result.getTargetDestinations().isEmpty()) {
200                        if (logger.isTraceEnabled()) {
201                                logger.trace("No active sessions for user destination: " + result.getSourceDestination());
202                        }
203                        if (this.broadcastHandler != null) {
204                                this.broadcastHandler.handleUnresolved(message);
205                        }
206                        return;
207                }
208                SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
209                initHeaders(accessor);
210                accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
211                accessor.setLeaveMutable(true);
212                message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
213                if (logger.isTraceEnabled()) {
214                        logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
215                }
216                for (String target : result.getTargetDestinations()) {
217                        this.messagingTemplate.send(target, message);
218                }
219        }
220
221        private void initHeaders(SimpMessageHeaderAccessor headerAccessor) {
222                if (getHeaderInitializer() != null) {
223                        getHeaderInitializer().initHeaders(headerAccessor);
224                }
225        }
226
227        @Override
228        public String toString() {
229                return "UserDestinationMessageHandler[" + this.destinationResolver + "]";
230        }
231
232
233        /**
234         * A handler that broadcasts locally unresolved messages to the broker and
235         * also handles similar broadcasts received from the broker.
236         */
237        private static class BroadcastHandler {
238
239                private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
240
241                private final MessageSendingOperations<String> messagingTemplate;
242
243                private final String broadcastDestination;
244
245                public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
246                        this.messagingTemplate = template;
247                        this.broadcastDestination = destination;
248                }
249
250                public String getBroadcastDestination() {
251                        return this.broadcastDestination;
252                }
253
254                public Message<?> preHandle(Message<?> message) throws MessagingException {
255                        String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
256                        if (!getBroadcastDestination().equals(destination)) {
257                                return message;
258                        }
259                        SimpMessageHeaderAccessor accessor =
260                                        SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
261                        if (accessor.getSessionId() == null) {
262                                // Our own broadcast
263                                return null;
264                        }
265                        destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
266                        if (logger.isTraceEnabled()) {
267                                logger.trace("Checking unresolved user destination: " + destination);
268                        }
269                        SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
270                        for (String name : accessor.toNativeHeaderMap().keySet()) {
271                                if (NO_COPY_LIST.contains(name)) {
272                                        continue;
273                                }
274                                newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
275                        }
276                        newAccessor.setDestination(destination);
277                        newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
278                        return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
279                }
280
281                public void handleUnresolved(Message<?> message) {
282                        MessageHeaders headers = message.getHeaders();
283                        if (SimpMessageHeaderAccessor.getFirstNativeHeader(
284                                        SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) {
285                                // Re-broadcast
286                                return;
287                        }
288                        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
289                        String destination = accessor.getDestination();
290                        accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination);
291                        accessor.setLeaveMutable(true);
292                        message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
293                        if (logger.isTraceEnabled()) {
294                                logger.trace("Translated " + destination + " -> " + getBroadcastDestination());
295                        }
296                        this.messagingTemplate.send(getBroadcastDestination(), message);
297                }
298        }
299
300}