001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.user; 018 019import java.util.Arrays; 020import java.util.List; 021 022import org.apache.commons.logging.Log; 023 024import org.springframework.context.SmartLifecycle; 025import org.springframework.lang.Nullable; 026import org.springframework.messaging.Message; 027import org.springframework.messaging.MessageHandler; 028import org.springframework.messaging.MessageHeaders; 029import org.springframework.messaging.MessagingException; 030import org.springframework.messaging.SubscribableChannel; 031import org.springframework.messaging.core.MessageSendingOperations; 032import org.springframework.messaging.simp.SimpLogging; 033import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 034import org.springframework.messaging.simp.SimpMessageType; 035import org.springframework.messaging.simp.SimpMessagingTemplate; 036import org.springframework.messaging.support.MessageBuilder; 037import org.springframework.messaging.support.MessageHeaderInitializer; 038import org.springframework.util.Assert; 039import org.springframework.util.StringUtils; 040 041/** 042 * {@code MessageHandler} with support for "user" destinations. 043 * 044 * <p>Listens for messages with "user" destinations, translates their destination 045 * to actual target destinations unique to the active session(s) of a user, and 046 * then sends the resolved messages to the broker channel to be delivered. 047 * 048 * @author Rossen Stoyanchev 049 * @since 4.0 050 */ 051public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle { 052 053 private static final Log logger = SimpLogging.forLogName(UserDestinationMessageHandler.class); 054 055 056 private final SubscribableChannel clientInboundChannel; 057 058 private final SubscribableChannel brokerChannel; 059 060 private final UserDestinationResolver destinationResolver; 061 062 private final MessageSendingOperations<String> messagingTemplate; 063 064 @Nullable 065 private BroadcastHandler broadcastHandler; 066 067 @Nullable 068 private MessageHeaderInitializer headerInitializer; 069 070 private volatile boolean running = false; 071 072 private final Object lifecycleMonitor = new Object(); 073 074 075 /** 076 * Create an instance with the given client and broker channels subscribing 077 * to handle messages from each and then sending any resolved messages to the 078 * broker channel. 079 * @param clientInboundChannel messages received from clients. 080 * @param brokerChannel messages sent to the broker. 081 * @param resolver the resolver for "user" destinations. 082 */ 083 public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel, 084 SubscribableChannel brokerChannel, UserDestinationResolver resolver) { 085 086 Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null"); 087 Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); 088 Assert.notNull(resolver, "resolver must not be null"); 089 090 this.clientInboundChannel = clientInboundChannel; 091 this.brokerChannel = brokerChannel; 092 this.messagingTemplate = new SimpMessagingTemplate(brokerChannel); 093 this.destinationResolver = resolver; 094 } 095 096 097 /** 098 * Return the configured {@link UserDestinationResolver}. 099 */ 100 public UserDestinationResolver getUserDestinationResolver() { 101 return this.destinationResolver; 102 } 103 104 /** 105 * Set a destination to broadcast messages to that remain unresolved because 106 * the user is not connected. In a multi-application server scenario this 107 * gives other application servers a chance to try. 108 * <p>By default this is not set. 109 * @param destination the target destination. 110 */ 111 public void setBroadcastDestination(@Nullable String destination) { 112 this.broadcastHandler = (StringUtils.hasText(destination) ? 113 new BroadcastHandler(this.messagingTemplate, destination) : null); 114 } 115 116 /** 117 * Return the configured destination for unresolved messages. 118 */ 119 @Nullable 120 public String getBroadcastDestination() { 121 return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null); 122 } 123 124 /** 125 * Return the messaging template used to send resolved messages to the 126 * broker channel. 127 */ 128 public MessageSendingOperations<String> getBrokerMessagingTemplate() { 129 return this.messagingTemplate; 130 } 131 132 /** 133 * Configure a custom {@link MessageHeaderInitializer} to initialize the 134 * headers of resolved target messages. 135 * <p>By default this is not set. 136 */ 137 public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) { 138 this.headerInitializer = headerInitializer; 139 } 140 141 /** 142 * Return the configured header initializer. 143 */ 144 @Nullable 145 public MessageHeaderInitializer getHeaderInitializer() { 146 return this.headerInitializer; 147 } 148 149 150 @Override 151 public final void start() { 152 synchronized (this.lifecycleMonitor) { 153 this.clientInboundChannel.subscribe(this); 154 this.brokerChannel.subscribe(this); 155 this.running = true; 156 } 157 } 158 159 @Override 160 public final void stop() { 161 synchronized (this.lifecycleMonitor) { 162 this.running = false; 163 this.clientInboundChannel.unsubscribe(this); 164 this.brokerChannel.unsubscribe(this); 165 } 166 } 167 168 @Override 169 public final void stop(Runnable callback) { 170 synchronized (this.lifecycleMonitor) { 171 stop(); 172 callback.run(); 173 } 174 } 175 176 @Override 177 public final boolean isRunning() { 178 return this.running; 179 } 180 181 182 @Override 183 public void handleMessage(Message<?> message) throws MessagingException { 184 Message<?> messageToUse = message; 185 if (this.broadcastHandler != null) { 186 messageToUse = this.broadcastHandler.preHandle(message); 187 if (messageToUse == null) { 188 return; 189 } 190 } 191 192 UserDestinationResult result = this.destinationResolver.resolveDestination(messageToUse); 193 if (result == null) { 194 return; 195 } 196 197 if (result.getTargetDestinations().isEmpty()) { 198 if (logger.isTraceEnabled()) { 199 logger.trace("No active sessions for user destination: " + result.getSourceDestination()); 200 } 201 if (this.broadcastHandler != null) { 202 this.broadcastHandler.handleUnresolved(messageToUse); 203 } 204 return; 205 } 206 207 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(messageToUse); 208 initHeaders(accessor); 209 accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination()); 210 accessor.setLeaveMutable(true); 211 212 messageToUse = MessageBuilder.createMessage(messageToUse.getPayload(), accessor.getMessageHeaders()); 213 if (logger.isTraceEnabled()) { 214 logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations()); 215 } 216 for (String target : result.getTargetDestinations()) { 217 this.messagingTemplate.send(target, messageToUse); 218 } 219 } 220 221 private void initHeaders(SimpMessageHeaderAccessor headerAccessor) { 222 if (getHeaderInitializer() != null) { 223 getHeaderInitializer().initHeaders(headerAccessor); 224 } 225 } 226 227 @Override 228 public String toString() { 229 return "UserDestinationMessageHandler[" + this.destinationResolver + "]"; 230 } 231 232 233 /** 234 * A handler that broadcasts locally unresolved messages to the broker and 235 * also handles similar broadcasts received from the broker. 236 */ 237 private static class BroadcastHandler { 238 239 private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id"); 240 241 private final MessageSendingOperations<String> messagingTemplate; 242 243 private final String broadcastDestination; 244 245 public BroadcastHandler(MessageSendingOperations<String> template, String destination) { 246 this.messagingTemplate = template; 247 this.broadcastDestination = destination; 248 } 249 250 public String getBroadcastDestination() { 251 return this.broadcastDestination; 252 } 253 254 @Nullable 255 public Message<?> preHandle(Message<?> message) throws MessagingException { 256 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 257 if (!getBroadcastDestination().equals(destination)) { 258 return message; 259 } 260 SimpMessageHeaderAccessor accessor = 261 SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); 262 Assert.state(accessor != null, "No SimpMessageHeaderAccessor"); 263 if (accessor.getSessionId() == null) { 264 // Our own broadcast 265 return null; 266 } 267 destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); 268 if (logger.isTraceEnabled()) { 269 logger.trace("Checking unresolved user destination: " + destination); 270 } 271 SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 272 for (String name : accessor.toNativeHeaderMap().keySet()) { 273 if (NO_COPY_LIST.contains(name)) { 274 continue; 275 } 276 newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name)); 277 } 278 if (destination != null) { 279 newAccessor.setDestination(destination); 280 } 281 newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block 282 return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders()); 283 } 284 285 public void handleUnresolved(Message<?> message) { 286 MessageHeaders headers = message.getHeaders(); 287 if (SimpMessageHeaderAccessor.getFirstNativeHeader( 288 SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) { 289 // Re-broadcast 290 return; 291 } 292 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message); 293 String destination = accessor.getDestination(); 294 accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination); 295 accessor.setLeaveMutable(true); 296 message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()); 297 if (logger.isTraceEnabled()) { 298 logger.trace("Translated " + destination + " -> " + getBroadcastDestination()); 299 } 300 this.messagingTemplate.send(getBroadcastDestination(), message); 301 } 302 } 303 304}