001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.user; 018 019import java.util.Arrays; 020import java.util.List; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024 025import org.springframework.context.SmartLifecycle; 026import org.springframework.messaging.Message; 027import org.springframework.messaging.MessageHandler; 028import org.springframework.messaging.MessageHeaders; 029import org.springframework.messaging.MessagingException; 030import org.springframework.messaging.SubscribableChannel; 031import org.springframework.messaging.core.MessageSendingOperations; 032import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 033import org.springframework.messaging.simp.SimpMessageType; 034import org.springframework.messaging.simp.SimpMessagingTemplate; 035import org.springframework.messaging.support.MessageBuilder; 036import org.springframework.messaging.support.MessageHeaderInitializer; 037import org.springframework.util.Assert; 038import org.springframework.util.StringUtils; 039 040/** 041 * {@code MessageHandler} with support for "user" destinations. 042 * 043 * <p>Listens for messages with "user" destinations, translates their destination 044 * to actual target destinations unique to the active session(s) of a user, and 045 * then sends the resolved messages to the broker channel to be delivered. 046 * 047 * @author Rossen Stoyanchev 048 * @since 4.0 049 */ 050public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle { 051 052 private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class); 053 054 055 private final SubscribableChannel clientInboundChannel; 056 057 private final SubscribableChannel brokerChannel; 058 059 private final UserDestinationResolver destinationResolver; 060 061 private final MessageSendingOperations<String> messagingTemplate; 062 063 private BroadcastHandler broadcastHandler; 064 065 private MessageHeaderInitializer headerInitializer; 066 067 private volatile boolean running = false; 068 069 private final Object lifecycleMonitor = new Object(); 070 071 072 /** 073 * Create an instance with the given client and broker channels subscribing 074 * to handle messages from each and then sending any resolved messages to the 075 * broker channel. 076 * @param clientInboundChannel messages received from clients. 077 * @param brokerChannel messages sent to the broker. 078 * @param resolver the resolver for "user" destinations. 079 */ 080 public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel, 081 SubscribableChannel brokerChannel, UserDestinationResolver resolver) { 082 083 Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null"); 084 Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); 085 Assert.notNull(resolver, "resolver must not be null"); 086 087 this.clientInboundChannel = clientInboundChannel; 088 this.brokerChannel = brokerChannel; 089 this.messagingTemplate = new SimpMessagingTemplate(brokerChannel); 090 this.destinationResolver = resolver; 091 } 092 093 094 /** 095 * Return the configured {@link UserDestinationResolver}. 096 */ 097 public UserDestinationResolver getUserDestinationResolver() { 098 return this.destinationResolver; 099 } 100 101 /** 102 * Set a destination to broadcast messages to that remain unresolved because 103 * the user is not connected. In a multi-application server scenario this 104 * gives other application servers a chance to try. 105 * <p>By default this is not set. 106 * @param destination the target destination. 107 */ 108 public void setBroadcastDestination(String destination) { 109 this.broadcastHandler = (StringUtils.hasText(destination) ? 110 new BroadcastHandler(this.messagingTemplate, destination) : null); 111 } 112 113 /** 114 * Return the configured destination for unresolved messages. 115 */ 116 public String getBroadcastDestination() { 117 return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null); 118 } 119 120 /** 121 * Return the messaging template used to send resolved messages to the 122 * broker channel. 123 */ 124 public MessageSendingOperations<String> getBrokerMessagingTemplate() { 125 return this.messagingTemplate; 126 } 127 128 /** 129 * Configure a custom {@link MessageHeaderInitializer} to initialize the 130 * headers of resolved target messages. 131 * <p>By default this is not set. 132 */ 133 public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 134 this.headerInitializer = headerInitializer; 135 } 136 137 /** 138 * Return the configured header initializer. 139 */ 140 public MessageHeaderInitializer getHeaderInitializer() { 141 return this.headerInitializer; 142 } 143 144 145 @Override 146 public boolean isAutoStartup() { 147 return true; 148 } 149 150 @Override 151 public int getPhase() { 152 return Integer.MAX_VALUE; 153 } 154 155 @Override 156 public final void start() { 157 synchronized (this.lifecycleMonitor) { 158 this.clientInboundChannel.subscribe(this); 159 this.brokerChannel.subscribe(this); 160 this.running = true; 161 } 162 } 163 164 @Override 165 public final void stop() { 166 synchronized (this.lifecycleMonitor) { 167 this.running = false; 168 this.clientInboundChannel.unsubscribe(this); 169 this.brokerChannel.unsubscribe(this); 170 } 171 } 172 173 @Override 174 public final void stop(Runnable callback) { 175 synchronized (this.lifecycleMonitor) { 176 stop(); 177 callback.run(); 178 } 179 } 180 181 @Override 182 public final boolean isRunning() { 183 return this.running; 184 } 185 186 187 @Override 188 public void handleMessage(Message<?> message) throws MessagingException { 189 if (this.broadcastHandler != null) { 190 message = this.broadcastHandler.preHandle(message); 191 if (message == null) { 192 return; 193 } 194 } 195 UserDestinationResult result = this.destinationResolver.resolveDestination(message); 196 if (result == null) { 197 return; 198 } 199 if (result.getTargetDestinations().isEmpty()) { 200 if (logger.isTraceEnabled()) { 201 logger.trace("No active sessions for user destination: " + result.getSourceDestination()); 202 } 203 if (this.broadcastHandler != null) { 204 this.broadcastHandler.handleUnresolved(message); 205 } 206 return; 207 } 208 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message); 209 initHeaders(accessor); 210 accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination()); 211 accessor.setLeaveMutable(true); 212 message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()); 213 if (logger.isTraceEnabled()) { 214 logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations()); 215 } 216 for (String target : result.getTargetDestinations()) { 217 this.messagingTemplate.send(target, message); 218 } 219 } 220 221 private void initHeaders(SimpMessageHeaderAccessor headerAccessor) { 222 if (getHeaderInitializer() != null) { 223 getHeaderInitializer().initHeaders(headerAccessor); 224 } 225 } 226 227 @Override 228 public String toString() { 229 return "UserDestinationMessageHandler[" + this.destinationResolver + "]"; 230 } 231 232 233 /** 234 * A handler that broadcasts locally unresolved messages to the broker and 235 * also handles similar broadcasts received from the broker. 236 */ 237 private static class BroadcastHandler { 238 239 private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id"); 240 241 private final MessageSendingOperations<String> messagingTemplate; 242 243 private final String broadcastDestination; 244 245 public BroadcastHandler(MessageSendingOperations<String> template, String destination) { 246 this.messagingTemplate = template; 247 this.broadcastDestination = destination; 248 } 249 250 public String getBroadcastDestination() { 251 return this.broadcastDestination; 252 } 253 254 public Message<?> preHandle(Message<?> message) throws MessagingException { 255 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 256 if (!getBroadcastDestination().equals(destination)) { 257 return message; 258 } 259 SimpMessageHeaderAccessor accessor = 260 SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); 261 if (accessor.getSessionId() == null) { 262 // Our own broadcast 263 return null; 264 } 265 destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); 266 if (logger.isTraceEnabled()) { 267 logger.trace("Checking unresolved user destination: " + destination); 268 } 269 SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 270 for (String name : accessor.toNativeHeaderMap().keySet()) { 271 if (NO_COPY_LIST.contains(name)) { 272 continue; 273 } 274 newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name)); 275 } 276 newAccessor.setDestination(destination); 277 newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block 278 return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders()); 279 } 280 281 public void handleUnresolved(Message<?> message) { 282 MessageHeaders headers = message.getHeaders(); 283 if (SimpMessageHeaderAccessor.getFirstNativeHeader( 284 SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) { 285 // Re-broadcast 286 return; 287 } 288 SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message); 289 String destination = accessor.getDestination(); 290 accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination); 291 accessor.setLeaveMutable(true); 292 message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()); 293 if (logger.isTraceEnabled()) { 294 logger.trace("Translated " + destination + " -> " + getBroadcastDestination()); 295 } 296 this.messagingTemplate.send(getBroadcastDestination(), message); 297 } 298 } 299 300}