001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp;
018
019import java.util.Map;
020
021import org.springframework.lang.Nullable;
022import org.springframework.messaging.Message;
023import org.springframework.messaging.MessageChannel;
024import org.springframework.messaging.MessageDeliveryException;
025import org.springframework.messaging.MessageHeaders;
026import org.springframework.messaging.MessagingException;
027import org.springframework.messaging.core.AbstractMessageSendingTemplate;
028import org.springframework.messaging.core.MessagePostProcessor;
029import org.springframework.messaging.support.MessageBuilder;
030import org.springframework.messaging.support.MessageHeaderAccessor;
031import org.springframework.messaging.support.MessageHeaderInitializer;
032import org.springframework.messaging.support.NativeMessageHeaderAccessor;
033import org.springframework.util.Assert;
034import org.springframework.util.StringUtils;
035
036/**
037 * An implementation of
038 * {@link org.springframework.messaging.simp.SimpMessageSendingOperations}.
039 *
040 * <p>Also provides methods for sending messages to a user. See
041 * {@link org.springframework.messaging.simp.user.UserDestinationResolver
042 * UserDestinationResolver}
043 * for more on user destinations.
044 *
045 * @author Rossen Stoyanchev
046 * @since 4.0
047 */
048public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String>
049                implements SimpMessageSendingOperations {
050
051        private final MessageChannel messageChannel;
052
053        private String destinationPrefix = "/user/";
054
055        private volatile long sendTimeout = -1;
056
057        @Nullable
058        private MessageHeaderInitializer headerInitializer;
059
060
061        /**
062         * Create a new {@link SimpMessagingTemplate} instance.
063         * @param messageChannel the message channel (never {@code null})
064         */
065        public SimpMessagingTemplate(MessageChannel messageChannel) {
066                Assert.notNull(messageChannel, "MessageChannel must not be null");
067                this.messageChannel = messageChannel;
068        }
069
070
071        /**
072         * Return the configured message channel.
073         */
074        public MessageChannel getMessageChannel() {
075                return this.messageChannel;
076        }
077
078        /**
079         * Configure the prefix to use for destinations targeting a specific user.
080         * <p>The default value is "/user/".
081         * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler
082         */
083        public void setUserDestinationPrefix(String prefix) {
084                Assert.hasText(prefix, "User destination prefix must not be empty");
085                this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix + "/");
086
087        }
088
089        /**
090         * Return the configured user destination prefix.
091         */
092        public String getUserDestinationPrefix() {
093                return this.destinationPrefix;
094        }
095
096        /**
097         * Specify the timeout value to use for send operations (in milliseconds).
098         */
099        public void setSendTimeout(long sendTimeout) {
100                this.sendTimeout = sendTimeout;
101        }
102
103        /**
104         * Return the configured send timeout (in milliseconds).
105         */
106        public long getSendTimeout() {
107                return this.sendTimeout;
108        }
109
110        /**
111         * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
112         * messages created through the {@code SimpMessagingTemplate}.
113         * <p>By default, this property is not set.
114         */
115        public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) {
116                this.headerInitializer = headerInitializer;
117        }
118
119        /**
120         * Return the configured header initializer.
121         */
122        @Nullable
123        public MessageHeaderInitializer getHeaderInitializer() {
124                return this.headerInitializer;
125        }
126
127
128        /**
129         * If the headers of the given message already contain a
130         * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER
131         * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without
132         * further changes.
133         * <p>If a destination header is not already present ,the message is sent
134         * to the configured {@link #setDefaultDestination(Object) defaultDestination}
135         * or an exception an {@code IllegalStateException} is raised if that isn't
136         * configured.
137         * @param message the message to send (never {@code null})
138         */
139        @Override
140        public void send(Message<?> message) {
141                Assert.notNull(message, "Message is required");
142                String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
143                if (destination != null) {
144                        sendInternal(message);
145                        return;
146                }
147                doSend(getRequiredDefaultDestination(), message);
148        }
149
150        @Override
151        protected void doSend(String destination, Message<?> message) {
152                Assert.notNull(destination, "Destination must not be null");
153
154                SimpMessageHeaderAccessor simpAccessor =
155                                MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
156
157                if (simpAccessor != null) {
158                        if (simpAccessor.isMutable()) {
159                                simpAccessor.setDestination(destination);
160                                simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
161                                simpAccessor.setImmutable();
162                                sendInternal(message);
163                                return;
164                        }
165                        else {
166                                // Try and keep the original accessor type
167                                simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
168                                initHeaders(simpAccessor);
169                        }
170                }
171                else {
172                        simpAccessor = SimpMessageHeaderAccessor.wrap(message);
173                        initHeaders(simpAccessor);
174                }
175
176                simpAccessor.setDestination(destination);
177                simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
178                message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());
179                sendInternal(message);
180        }
181
182        private void sendInternal(Message<?> message) {
183                String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
184                Assert.notNull(destination, "Destination header required");
185
186                long timeout = this.sendTimeout;
187                boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
188
189                if (!sent) {
190                        throw new MessageDeliveryException(message,
191                                        "Failed to send message to destination '" + destination + "' within timeout: " + timeout);
192                }
193        }
194
195        private void initHeaders(SimpMessageHeaderAccessor simpAccessor) {
196                if (getHeaderInitializer() != null) {
197                        getHeaderInitializer().initHeaders(simpAccessor);
198                }
199        }
200
201
202        @Override
203        public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException {
204                convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null);
205        }
206
207        @Override
208        public void convertAndSendToUser(String user, String destination, Object payload,
209                        @Nullable Map<String, Object> headers) throws MessagingException {
210
211                convertAndSendToUser(user, destination, payload, headers, null);
212        }
213
214        @Override
215        public void convertAndSendToUser(String user, String destination, Object payload,
216                        @Nullable MessagePostProcessor postProcessor) throws MessagingException {
217
218                convertAndSendToUser(user, destination, payload, null, postProcessor);
219        }
220
221        @Override
222        public void convertAndSendToUser(String user, String destination, Object payload,
223                        @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
224                        throws MessagingException {
225
226                Assert.notNull(user, "User must not be null");
227                Assert.isTrue(!user.contains("%2F"), "Invalid sequence \"%2F\" in user name: " + user);
228                user = StringUtils.replace(user, "/", "%2F");
229                destination = destination.startsWith("/") ? destination : "/" + destination;
230                super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
231        }
232
233
234        /**
235         * Creates a new map and puts the given headers under the key
236         * {@link NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}.
237         * effectively treats the input header map as headers to be sent out to the
238         * destination.
239         * <p>However if the given headers already contain the key
240         * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is
241         * returned without changes.
242         * <p>Also if the given headers were prepared and obtained with
243         * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers
244         * instance is also returned without changes.
245         */
246        @Override
247        protected Map<String, Object> processHeadersToSend(@Nullable Map<String, Object> headers) {
248                if (headers == null) {
249                        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
250                        initHeaders(headerAccessor);
251                        headerAccessor.setLeaveMutable(true);
252                        return headerAccessor.getMessageHeaders();
253                }
254                if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
255                        return headers;
256                }
257                if (headers instanceof MessageHeaders) {
258                        SimpMessageHeaderAccessor accessor =
259                                        MessageHeaderAccessor.getAccessor((MessageHeaders) headers, SimpMessageHeaderAccessor.class);
260                        if (accessor != null) {
261                                return headers;
262                        }
263                }
264
265                SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
266                initHeaders(headerAccessor);
267                headers.forEach((key, value) -> headerAccessor.setNativeHeader(key, (value != null ? value.toString() : null)));
268                return headerAccessor.getMessageHeaders();
269        }
270
271}