001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp; 018 019import java.util.Map; 020 021import org.springframework.messaging.Message; 022import org.springframework.messaging.MessageChannel; 023import org.springframework.messaging.MessageDeliveryException; 024import org.springframework.messaging.MessageHeaders; 025import org.springframework.messaging.MessagingException; 026import org.springframework.messaging.core.AbstractMessageSendingTemplate; 027import org.springframework.messaging.core.MessagePostProcessor; 028import org.springframework.messaging.support.MessageBuilder; 029import org.springframework.messaging.support.MessageHeaderAccessor; 030import org.springframework.messaging.support.MessageHeaderInitializer; 031import org.springframework.messaging.support.NativeMessageHeaderAccessor; 032import org.springframework.util.Assert; 033import org.springframework.util.StringUtils; 034 035/** 036 * An implementation of 037 * {@link org.springframework.messaging.simp.SimpMessageSendingOperations}. 038 * 039 * <p>Also provides methods for sending messages to a user. See 040 * {@link org.springframework.messaging.simp.user.UserDestinationResolver 041 * UserDestinationResolver} 042 * for more on user destinations. 043 * 044 * @author Rossen Stoyanchev 045 * @since 4.0 046 */ 047public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String> 048 implements SimpMessageSendingOperations { 049 050 private final MessageChannel messageChannel; 051 052 private String destinationPrefix = "/user/"; 053 054 private volatile long sendTimeout = -1; 055 056 private MessageHeaderInitializer headerInitializer; 057 058 059 /** 060 * Create a new {@link SimpMessagingTemplate} instance. 061 * @param messageChannel the message channel (never {@code null}) 062 */ 063 public SimpMessagingTemplate(MessageChannel messageChannel) { 064 Assert.notNull(messageChannel, "MessageChannel must not be null"); 065 this.messageChannel = messageChannel; 066 } 067 068 069 /** 070 * Return the configured message channel. 071 */ 072 public MessageChannel getMessageChannel() { 073 return this.messageChannel; 074 } 075 076 /** 077 * Configure the prefix to use for destinations targeting a specific user. 078 * <p>The default value is "/user/". 079 * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler 080 */ 081 public void setUserDestinationPrefix(String prefix) { 082 Assert.hasText(prefix, "User destination prefix must not be empty"); 083 this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix + "/"); 084 085 } 086 087 /** 088 * Return the configured user destination prefix. 089 */ 090 public String getUserDestinationPrefix() { 091 return this.destinationPrefix; 092 } 093 094 /** 095 * Specify the timeout value to use for send operations (in milliseconds). 096 */ 097 public void setSendTimeout(long sendTimeout) { 098 this.sendTimeout = sendTimeout; 099 } 100 101 /** 102 * Return the configured send timeout (in milliseconds). 103 */ 104 public long getSendTimeout() { 105 return this.sendTimeout; 106 } 107 108 /** 109 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 110 * messages created through the {@code SimpMessagingTemplate}. 111 * <p>By default, this property is not set. 112 */ 113 public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 114 this.headerInitializer = headerInitializer; 115 } 116 117 /** 118 * Return the configured header initializer. 119 */ 120 public MessageHeaderInitializer getHeaderInitializer() { 121 return this.headerInitializer; 122 } 123 124 125 /** 126 * If the headers of the given message already contain a 127 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER 128 * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without 129 * further changes. 130 * <p>If a destination header is not already present ,the message is sent 131 * to the configured {@link #setDefaultDestination(Object) defaultDestination} 132 * or an exception an {@code IllegalStateException} is raised if that isn't 133 * configured. 134 * @param message the message to send (never {@code null}) 135 */ 136 @Override 137 public void send(Message<?> message) { 138 Assert.notNull(message, "Message is required"); 139 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 140 if (destination != null) { 141 sendInternal(message); 142 return; 143 } 144 doSend(getRequiredDefaultDestination(), message); 145 } 146 147 @Override 148 protected void doSend(String destination, Message<?> message) { 149 Assert.notNull(destination, "Destination must not be null"); 150 151 SimpMessageHeaderAccessor simpAccessor = 152 MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); 153 154 if (simpAccessor != null) { 155 if (simpAccessor.isMutable()) { 156 simpAccessor.setDestination(destination); 157 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 158 simpAccessor.setImmutable(); 159 sendInternal(message); 160 return; 161 } 162 else { 163 // Try and keep the original accessor type 164 simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message); 165 initHeaders(simpAccessor); 166 } 167 } 168 else { 169 simpAccessor = SimpMessageHeaderAccessor.wrap(message); 170 initHeaders(simpAccessor); 171 } 172 173 simpAccessor.setDestination(destination); 174 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 175 message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders()); 176 sendInternal(message); 177 } 178 179 private void sendInternal(Message<?> message) { 180 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 181 Assert.notNull(destination, "Destination header required"); 182 183 long timeout = this.sendTimeout; 184 boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message)); 185 186 if (!sent) { 187 throw new MessageDeliveryException(message, 188 "Failed to send message to destination '" + destination + "' within timeout: " + timeout); 189 } 190 } 191 192 private void initHeaders(SimpMessageHeaderAccessor simpAccessor) { 193 if (getHeaderInitializer() != null) { 194 getHeaderInitializer().initHeaders(simpAccessor); 195 } 196 } 197 198 199 @Override 200 public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException { 201 convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null); 202 } 203 204 @Override 205 public void convertAndSendToUser(String user, String destination, Object payload, 206 Map<String, Object> headers) throws MessagingException { 207 208 convertAndSendToUser(user, destination, payload, headers, null); 209 } 210 211 @Override 212 public void convertAndSendToUser(String user, String destination, Object payload, 213 MessagePostProcessor postProcessor) throws MessagingException { 214 215 convertAndSendToUser(user, destination, payload, null, postProcessor); 216 } 217 218 @Override 219 public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers, 220 MessagePostProcessor postProcessor) throws MessagingException { 221 222 Assert.notNull(user, "User must not be null"); 223 user = StringUtils.replace(user, "/", "%2F"); 224 destination = destination.startsWith("/") ? destination : "/" + destination; 225 super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor); 226 } 227 228 229 /** 230 * Creates a new map and puts the given headers under the key 231 * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}. 232 * effectively treats the input header map as headers to be sent out to the 233 * destination. 234 * <p>However if the given headers already contain the key 235 * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is 236 * returned without changes. 237 * <p>Also if the given headers were prepared and obtained with 238 * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers 239 * instance is also returned without changes. 240 */ 241 @Override 242 protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) { 243 if (headers == null) { 244 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 245 initHeaders(headerAccessor); 246 headerAccessor.setLeaveMutable(true); 247 return headerAccessor.getMessageHeaders(); 248 } 249 if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) { 250 return headers; 251 } 252 if (headers instanceof MessageHeaders) { 253 SimpMessageHeaderAccessor accessor = 254 MessageHeaderAccessor.getAccessor((MessageHeaders) headers, SimpMessageHeaderAccessor.class); 255 if (accessor != null) { 256 return headers; 257 } 258 } 259 260 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 261 initHeaders(headerAccessor); 262 for (Map.Entry<String, Object> headerEntry : headers.entrySet()) { 263 Object value = headerEntry.getValue(); 264 headerAccessor.setNativeHeader(headerEntry.getKey(), (value != null ? value.toString() : null)); 265 } 266 return headerAccessor.getMessageHeaders(); 267 } 268 269}