001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp; 018 019import java.util.Map; 020 021import org.springframework.lang.Nullable; 022import org.springframework.messaging.Message; 023import org.springframework.messaging.MessageChannel; 024import org.springframework.messaging.MessageDeliveryException; 025import org.springframework.messaging.MessageHeaders; 026import org.springframework.messaging.MessagingException; 027import org.springframework.messaging.core.AbstractMessageSendingTemplate; 028import org.springframework.messaging.core.MessagePostProcessor; 029import org.springframework.messaging.support.MessageBuilder; 030import org.springframework.messaging.support.MessageHeaderAccessor; 031import org.springframework.messaging.support.MessageHeaderInitializer; 032import org.springframework.messaging.support.NativeMessageHeaderAccessor; 033import org.springframework.util.Assert; 034import org.springframework.util.StringUtils; 035 036/** 037 * An implementation of 038 * {@link org.springframework.messaging.simp.SimpMessageSendingOperations}. 039 * 040 * <p>Also provides methods for sending messages to a user. See 041 * {@link org.springframework.messaging.simp.user.UserDestinationResolver 042 * UserDestinationResolver} 043 * for more on user destinations. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.0 047 */ 048public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String> 049 implements SimpMessageSendingOperations { 050 051 private final MessageChannel messageChannel; 052 053 private String destinationPrefix = "/user/"; 054 055 private volatile long sendTimeout = -1; 056 057 @Nullable 058 private MessageHeaderInitializer headerInitializer; 059 060 061 /** 062 * Create a new {@link SimpMessagingTemplate} instance. 063 * @param messageChannel the message channel (never {@code null}) 064 */ 065 public SimpMessagingTemplate(MessageChannel messageChannel) { 066 Assert.notNull(messageChannel, "MessageChannel must not be null"); 067 this.messageChannel = messageChannel; 068 } 069 070 071 /** 072 * Return the configured message channel. 073 */ 074 public MessageChannel getMessageChannel() { 075 return this.messageChannel; 076 } 077 078 /** 079 * Configure the prefix to use for destinations targeting a specific user. 080 * <p>The default value is "/user/". 081 * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler 082 */ 083 public void setUserDestinationPrefix(String prefix) { 084 Assert.hasText(prefix, "User destination prefix must not be empty"); 085 this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix + "/"); 086 087 } 088 089 /** 090 * Return the configured user destination prefix. 091 */ 092 public String getUserDestinationPrefix() { 093 return this.destinationPrefix; 094 } 095 096 /** 097 * Specify the timeout value to use for send operations (in milliseconds). 098 */ 099 public void setSendTimeout(long sendTimeout) { 100 this.sendTimeout = sendTimeout; 101 } 102 103 /** 104 * Return the configured send timeout (in milliseconds). 105 */ 106 public long getSendTimeout() { 107 return this.sendTimeout; 108 } 109 110 /** 111 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 112 * messages created through the {@code SimpMessagingTemplate}. 113 * <p>By default, this property is not set. 114 */ 115 public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) { 116 this.headerInitializer = headerInitializer; 117 } 118 119 /** 120 * Return the configured header initializer. 121 */ 122 @Nullable 123 public MessageHeaderInitializer getHeaderInitializer() { 124 return this.headerInitializer; 125 } 126 127 128 /** 129 * If the headers of the given message already contain a 130 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER 131 * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without 132 * further changes. 133 * <p>If a destination header is not already present ,the message is sent 134 * to the configured {@link #setDefaultDestination(Object) defaultDestination} 135 * or an exception an {@code IllegalStateException} is raised if that isn't 136 * configured. 137 * @param message the message to send (never {@code null}) 138 */ 139 @Override 140 public void send(Message<?> message) { 141 Assert.notNull(message, "Message is required"); 142 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 143 if (destination != null) { 144 sendInternal(message); 145 return; 146 } 147 doSend(getRequiredDefaultDestination(), message); 148 } 149 150 @Override 151 protected void doSend(String destination, Message<?> message) { 152 Assert.notNull(destination, "Destination must not be null"); 153 154 SimpMessageHeaderAccessor simpAccessor = 155 MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); 156 157 if (simpAccessor != null) { 158 if (simpAccessor.isMutable()) { 159 simpAccessor.setDestination(destination); 160 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 161 simpAccessor.setImmutable(); 162 sendInternal(message); 163 return; 164 } 165 else { 166 // Try and keep the original accessor type 167 simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message); 168 initHeaders(simpAccessor); 169 } 170 } 171 else { 172 simpAccessor = SimpMessageHeaderAccessor.wrap(message); 173 initHeaders(simpAccessor); 174 } 175 176 simpAccessor.setDestination(destination); 177 simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); 178 message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders()); 179 sendInternal(message); 180 } 181 182 private void sendInternal(Message<?> message) { 183 String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); 184 Assert.notNull(destination, "Destination header required"); 185 186 long timeout = this.sendTimeout; 187 boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message)); 188 189 if (!sent) { 190 throw new MessageDeliveryException(message, 191 "Failed to send message to destination '" + destination + "' within timeout: " + timeout); 192 } 193 } 194 195 private void initHeaders(SimpMessageHeaderAccessor simpAccessor) { 196 if (getHeaderInitializer() != null) { 197 getHeaderInitializer().initHeaders(simpAccessor); 198 } 199 } 200 201 202 @Override 203 public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException { 204 convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null); 205 } 206 207 @Override 208 public void convertAndSendToUser(String user, String destination, Object payload, 209 @Nullable Map<String, Object> headers) throws MessagingException { 210 211 convertAndSendToUser(user, destination, payload, headers, null); 212 } 213 214 @Override 215 public void convertAndSendToUser(String user, String destination, Object payload, 216 @Nullable MessagePostProcessor postProcessor) throws MessagingException { 217 218 convertAndSendToUser(user, destination, payload, null, postProcessor); 219 } 220 221 @Override 222 public void convertAndSendToUser(String user, String destination, Object payload, 223 @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) 224 throws MessagingException { 225 226 Assert.notNull(user, "User must not be null"); 227 Assert.isTrue(!user.contains("%2F"), "Invalid sequence \"%2F\" in user name: " + user); 228 user = StringUtils.replace(user, "/", "%2F"); 229 destination = destination.startsWith("/") ? destination : "/" + destination; 230 super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor); 231 } 232 233 234 /** 235 * Creates a new map and puts the given headers under the key 236 * {@link NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}. 237 * effectively treats the input header map as headers to be sent out to the 238 * destination. 239 * <p>However if the given headers already contain the key 240 * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is 241 * returned without changes. 242 * <p>Also if the given headers were prepared and obtained with 243 * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers 244 * instance is also returned without changes. 245 */ 246 @Override 247 protected Map<String, Object> processHeadersToSend(@Nullable Map<String, Object> headers) { 248 if (headers == null) { 249 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 250 initHeaders(headerAccessor); 251 headerAccessor.setLeaveMutable(true); 252 return headerAccessor.getMessageHeaders(); 253 } 254 if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) { 255 return headers; 256 } 257 if (headers instanceof MessageHeaders) { 258 SimpMessageHeaderAccessor accessor = 259 MessageHeaderAccessor.getAccessor((MessageHeaders) headers, SimpMessageHeaderAccessor.class); 260 if (accessor != null) { 261 return headers; 262 } 263 } 264 265 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); 266 initHeaders(headerAccessor); 267 headers.forEach((key, value) -> headerAccessor.setNativeHeader(key, (value != null ? value.toString() : null))); 268 return headerAccessor.getMessageHeaders(); 269 } 270 271}