001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp;
018
019import java.util.Map;
020
021import org.springframework.messaging.Message;
022import org.springframework.messaging.MessageChannel;
023import org.springframework.messaging.MessageDeliveryException;
024import org.springframework.messaging.MessageHeaders;
025import org.springframework.messaging.MessagingException;
026import org.springframework.messaging.core.AbstractMessageSendingTemplate;
027import org.springframework.messaging.core.MessagePostProcessor;
028import org.springframework.messaging.support.MessageBuilder;
029import org.springframework.messaging.support.MessageHeaderAccessor;
030import org.springframework.messaging.support.MessageHeaderInitializer;
031import org.springframework.messaging.support.NativeMessageHeaderAccessor;
032import org.springframework.util.Assert;
033import org.springframework.util.StringUtils;
034
035/**
036 * An implementation of
037 * {@link org.springframework.messaging.simp.SimpMessageSendingOperations}.
038 *
039 * <p>Also provides methods for sending messages to a user. See
040 * {@link org.springframework.messaging.simp.user.UserDestinationResolver
041 * UserDestinationResolver}
042 * for more on user destinations.
043 *
044 * @author Rossen Stoyanchev
045 * @since 4.0
046 */
047public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String>
048                implements SimpMessageSendingOperations {
049
050        private final MessageChannel messageChannel;
051
052        private String destinationPrefix = "/user/";
053
054        private volatile long sendTimeout = -1;
055
056        private MessageHeaderInitializer headerInitializer;
057
058
059        /**
060         * Create a new {@link SimpMessagingTemplate} instance.
061         * @param messageChannel the message channel (never {@code null})
062         */
063        public SimpMessagingTemplate(MessageChannel messageChannel) {
064                Assert.notNull(messageChannel, "MessageChannel must not be null");
065                this.messageChannel = messageChannel;
066        }
067
068
069        /**
070         * Return the configured message channel.
071         */
072        public MessageChannel getMessageChannel() {
073                return this.messageChannel;
074        }
075
076        /**
077         * Configure the prefix to use for destinations targeting a specific user.
078         * <p>The default value is "/user/".
079         * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler
080         */
081        public void setUserDestinationPrefix(String prefix) {
082                Assert.hasText(prefix, "User destination prefix must not be empty");
083                this.destinationPrefix = (prefix.endsWith("/") ? prefix : prefix + "/");
084
085        }
086
087        /**
088         * Return the configured user destination prefix.
089         */
090        public String getUserDestinationPrefix() {
091                return this.destinationPrefix;
092        }
093
094        /**
095         * Specify the timeout value to use for send operations (in milliseconds).
096         */
097        public void setSendTimeout(long sendTimeout) {
098                this.sendTimeout = sendTimeout;
099        }
100
101        /**
102         * Return the configured send timeout (in milliseconds).
103         */
104        public long getSendTimeout() {
105                return this.sendTimeout;
106        }
107
108        /**
109         * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
110         * messages created through the {@code SimpMessagingTemplate}.
111         * <p>By default, this property is not set.
112         */
113        public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
114                this.headerInitializer = headerInitializer;
115        }
116
117        /**
118         * Return the configured header initializer.
119         */
120        public MessageHeaderInitializer getHeaderInitializer() {
121                return this.headerInitializer;
122        }
123
124
125        /**
126         * If the headers of the given message already contain a
127         * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor#DESTINATION_HEADER
128         * SimpMessageHeaderAccessor#DESTINATION_HEADER} then the message is sent without
129         * further changes.
130         * <p>If a destination header is not already present ,the message is sent
131         * to the configured {@link #setDefaultDestination(Object) defaultDestination}
132         * or an exception an {@code IllegalStateException} is raised if that isn't
133         * configured.
134         * @param message the message to send (never {@code null})
135         */
136        @Override
137        public void send(Message<?> message) {
138                Assert.notNull(message, "Message is required");
139                String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
140                if (destination != null) {
141                        sendInternal(message);
142                        return;
143                }
144                doSend(getRequiredDefaultDestination(), message);
145        }
146
147        @Override
148        protected void doSend(String destination, Message<?> message) {
149                Assert.notNull(destination, "Destination must not be null");
150
151                SimpMessageHeaderAccessor simpAccessor =
152                                MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
153
154                if (simpAccessor != null) {
155                        if (simpAccessor.isMutable()) {
156                                simpAccessor.setDestination(destination);
157                                simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
158                                simpAccessor.setImmutable();
159                                sendInternal(message);
160                                return;
161                        }
162                        else {
163                                // Try and keep the original accessor type
164                                simpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);
165                                initHeaders(simpAccessor);
166                        }
167                }
168                else {
169                        simpAccessor = SimpMessageHeaderAccessor.wrap(message);
170                        initHeaders(simpAccessor);
171                }
172
173                simpAccessor.setDestination(destination);
174                simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
175                message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());
176                sendInternal(message);
177        }
178
179        private void sendInternal(Message<?> message) {
180                String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
181                Assert.notNull(destination, "Destination header required");
182
183                long timeout = this.sendTimeout;
184                boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
185
186                if (!sent) {
187                        throw new MessageDeliveryException(message,
188                                        "Failed to send message to destination '" + destination + "' within timeout: " + timeout);
189                }
190        }
191
192        private void initHeaders(SimpMessageHeaderAccessor simpAccessor) {
193                if (getHeaderInitializer() != null) {
194                        getHeaderInitializer().initHeaders(simpAccessor);
195                }
196        }
197
198
199        @Override
200        public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException {
201                convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null);
202        }
203
204        @Override
205        public void convertAndSendToUser(String user, String destination, Object payload,
206                        Map<String, Object> headers) throws MessagingException {
207
208                convertAndSendToUser(user, destination, payload, headers, null);
209        }
210
211        @Override
212        public void convertAndSendToUser(String user, String destination, Object payload,
213                        MessagePostProcessor postProcessor) throws MessagingException {
214
215                convertAndSendToUser(user, destination, payload, null, postProcessor);
216        }
217
218        @Override
219        public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers,
220                        MessagePostProcessor postProcessor) throws MessagingException {
221
222                Assert.notNull(user, "User must not be null");
223                user = StringUtils.replace(user, "/", "%2F");
224                destination = destination.startsWith("/") ? destination : "/" + destination;
225                super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
226        }
227
228
229        /**
230         * Creates a new map and puts the given headers under the key
231         * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS NATIVE_HEADERS}.
232         * effectively treats the input header map as headers to be sent out to the
233         * destination.
234         * <p>However if the given headers already contain the key
235         * {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is
236         * returned without changes.
237         * <p>Also if the given headers were prepared and obtained with
238         * {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers
239         * instance is also returned without changes.
240         */
241        @Override
242        protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) {
243                if (headers == null) {
244                        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
245                        initHeaders(headerAccessor);
246                        headerAccessor.setLeaveMutable(true);
247                        return headerAccessor.getMessageHeaders();
248                }
249                if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
250                        return headers;
251                }
252                if (headers instanceof MessageHeaders) {
253                        SimpMessageHeaderAccessor accessor =
254                                        MessageHeaderAccessor.getAccessor((MessageHeaders) headers, SimpMessageHeaderAccessor.class);
255                        if (accessor != null) {
256                                return headers;
257                        }
258                }
259
260                SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
261                initHeaders(headerAccessor);
262                for (Map.Entry<String, Object> headerEntry : headers.entrySet()) {
263                        Object value = headerEntry.getValue();
264                        headerAccessor.setNativeHeader(headerEntry.getKey(), (value != null ? value.toString() : null));
265                }
266                return headerAccessor.getMessageHeaders();
267        }
268
269}