001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.core;
018
019import java.util.concurrent.CountDownLatch;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024
025import org.springframework.beans.BeansException;
026import org.springframework.beans.factory.BeanFactory;
027import org.springframework.beans.factory.BeanFactoryAware;
028import org.springframework.lang.Nullable;
029import org.springframework.messaging.Message;
030import org.springframework.messaging.MessageChannel;
031import org.springframework.messaging.MessageDeliveryException;
032import org.springframework.messaging.MessageHeaders;
033import org.springframework.messaging.PollableChannel;
034import org.springframework.messaging.support.MessageBuilder;
035import org.springframework.messaging.support.MessageHeaderAccessor;
036import org.springframework.util.Assert;
037
038/**
039 * A messaging template that resolves destinations names to {@link MessageChannel}'s
040 * to send and receive messages from.
041 *
042 * @author Mark Fisher
043 * @author Rossen Stoyanchev
044 * @author Gary Russell
045 * @since 4.0
046 */
047public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
048                implements BeanFactoryAware {
049
050        /**
051         * The default header key used for a send timeout.
052         */
053        public static final String DEFAULT_SEND_TIMEOUT_HEADER = "sendTimeout";
054
055        /**
056         * The default header key used for a receive timeout.
057         */
058        public static final String DEFAULT_RECEIVE_TIMEOUT_HEADER = "receiveTimeout";
059
060        private volatile long sendTimeout = -1;
061
062        private volatile long receiveTimeout = -1;
063
064        private String sendTimeoutHeader = DEFAULT_SEND_TIMEOUT_HEADER;
065
066        private String receiveTimeoutHeader = DEFAULT_RECEIVE_TIMEOUT_HEADER;
067
068        private volatile boolean throwExceptionOnLateReply = false;
069
070
071        /**
072         * Configure the default timeout value to use for send operations.
073         * May be overridden for individual messages.
074         * @param sendTimeout the send timeout in milliseconds
075         * @see #setSendTimeoutHeader(String)
076         */
077        public void setSendTimeout(long sendTimeout) {
078                this.sendTimeout = sendTimeout;
079        }
080
081        /**
082         * Return the configured default send operation timeout value.
083         */
084        public long getSendTimeout() {
085                return this.sendTimeout;
086        }
087
088        /**
089         * Configure the default timeout value to use for receive operations.
090         * May be overridden for individual messages when using sendAndReceive
091         * operations.
092         * @param receiveTimeout the receive timeout in milliseconds
093         * @see #setReceiveTimeoutHeader(String)
094         */
095        public void setReceiveTimeout(long receiveTimeout) {
096                this.receiveTimeout = receiveTimeout;
097        }
098
099        /**
100         * Return the configured receive operation timeout value.
101         */
102        public long getReceiveTimeout() {
103                return this.receiveTimeout;
104        }
105
106        /**
107         * Set the name of the header used to determine the send timeout (if present).
108         * Default {@value #DEFAULT_SEND_TIMEOUT_HEADER}.
109         * <p>The header is removed before sending the message to avoid propagation.
110         * @since 5.0
111         */
112        public void setSendTimeoutHeader(String sendTimeoutHeader) {
113                Assert.notNull(sendTimeoutHeader, "'sendTimeoutHeader' cannot be null");
114                this.sendTimeoutHeader = sendTimeoutHeader;
115        }
116
117        /**
118         * Return the configured send-timeout header.
119         * @since 5.0
120         */
121        public String getSendTimeoutHeader() {
122                return this.sendTimeoutHeader;
123        }
124
125        /**
126         * Set the name of the header used to determine the send timeout (if present).
127         * Default {@value #DEFAULT_RECEIVE_TIMEOUT_HEADER}.
128         * The header is removed before sending the message to avoid propagation.
129         * @since 5.0
130         */
131        public void setReceiveTimeoutHeader(String receiveTimeoutHeader) {
132                Assert.notNull(receiveTimeoutHeader, "'receiveTimeoutHeader' cannot be null");
133                this.receiveTimeoutHeader = receiveTimeoutHeader;
134        }
135
136        /**
137         * Return the configured receive-timeout header.
138         * @since 5.0
139         */
140        public String getReceiveTimeoutHeader() {
141                return this.receiveTimeoutHeader;
142        }
143
144        /**
145         * Whether the thread sending a reply should have an exception raised if the
146         * receiving thread isn't going to receive the reply either because it timed out,
147         * or because it already received a reply, or because it got an exception while
148         * sending the request message.
149         * <p>The default value is {@code false} in which case only a WARN message is logged.
150         * If set to {@code true} a {@link MessageDeliveryException} is raised in addition
151         * to the log message.
152         * @param throwExceptionOnLateReply whether to throw an exception or not
153         */
154        public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
155                this.throwExceptionOnLateReply = throwExceptionOnLateReply;
156        }
157
158        @Override
159        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
160                setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
161        }
162
163
164        @Override
165        protected final void doSend(MessageChannel channel, Message<?> message) {
166                doSend(channel, message, sendTimeout(message));
167        }
168
169        protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
170                Assert.notNull(channel, "MessageChannel is required");
171
172                Message<?> messageToSend = message;
173                MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
174                if (accessor != null && accessor.isMutable()) {
175                        accessor.removeHeader(this.sendTimeoutHeader);
176                        accessor.removeHeader(this.receiveTimeoutHeader);
177                        accessor.setImmutable();
178                }
179                else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
180                                || message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
181                        messageToSend = MessageBuilder.fromMessage(message)
182                                        .setHeader(this.sendTimeoutHeader, null)
183                                        .setHeader(this.receiveTimeoutHeader, null)
184                                        .build();
185                }
186
187                boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
188
189                if (!sent) {
190                        throw new MessageDeliveryException(message,
191                                        "Failed to send message to channel '" + channel + "' within timeout: " + timeout);
192                }
193        }
194
195        @Override
196        @Nullable
197        protected final Message<?> doReceive(MessageChannel channel) {
198                return doReceive(channel, this.receiveTimeout);
199        }
200
201        @Nullable
202        protected final Message<?> doReceive(MessageChannel channel, long timeout) {
203                Assert.notNull(channel, "MessageChannel is required");
204                Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
205
206                Message<?> message = (timeout >= 0 ?
207                                ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
208
209                if (message == null && logger.isTraceEnabled()) {
210                        logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
211                }
212
213                return message;
214        }
215
216        @Override
217        @Nullable
218        protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) {
219                Assert.notNull(channel, "'channel' is required");
220                Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
221                Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
222
223                long sendTimeout = sendTimeout(requestMessage);
224                long receiveTimeout = receiveTimeout(requestMessage);
225
226                TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply);
227                requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel)
228                                .setHeader(this.sendTimeoutHeader, null)
229                                .setHeader(this.receiveTimeoutHeader, null)
230                                .setErrorChannel(tempReplyChannel).build();
231
232                try {
233                        doSend(channel, requestMessage, sendTimeout);
234                }
235                catch (RuntimeException ex) {
236                        tempReplyChannel.setSendFailed(true);
237                        throw ex;
238                }
239
240                Message<?> replyMessage = this.doReceive(tempReplyChannel, receiveTimeout);
241                if (replyMessage != null) {
242                        replyMessage = MessageBuilder.fromMessage(replyMessage)
243                                        .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
244                                        .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
245                                        .build();
246                }
247
248                return replyMessage;
249        }
250
251        private long sendTimeout(Message<?> requestMessage) {
252                Long sendTimeout = headerToLong(requestMessage.getHeaders().get(this.sendTimeoutHeader));
253                return (sendTimeout != null ? sendTimeout : this.sendTimeout);
254        }
255
256        private long receiveTimeout(Message<?> requestMessage) {
257                Long receiveTimeout = headerToLong(requestMessage.getHeaders().get(this.receiveTimeoutHeader));
258                return (receiveTimeout != null ? receiveTimeout : this.receiveTimeout);
259        }
260
261        @Nullable
262        private Long headerToLong(@Nullable Object headerValue) {
263                if (headerValue instanceof Number) {
264                        return ((Number) headerValue).longValue();
265                }
266                else if (headerValue instanceof String) {
267                        return Long.parseLong((String) headerValue);
268                }
269                else {
270                        return null;
271                }
272        }
273
274
275        /**
276         * A temporary channel for receiving a single reply message.
277         */
278        private static final class TemporaryReplyChannel implements PollableChannel {
279
280                private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
281
282                private final CountDownLatch replyLatch = new CountDownLatch(1);
283
284                private final boolean throwExceptionOnLateReply;
285
286                @Nullable
287                private volatile Message<?> replyMessage;
288
289                private volatile boolean hasReceived;
290
291                private volatile boolean hasTimedOut;
292
293                private volatile boolean hasSendFailed;
294
295                TemporaryReplyChannel(boolean throwExceptionOnLateReply) {
296                        this.throwExceptionOnLateReply = throwExceptionOnLateReply;
297                }
298
299                public void setSendFailed(boolean hasSendError) {
300                        this.hasSendFailed = hasSendError;
301                }
302
303                @Override
304                @Nullable
305                public Message<?> receive() {
306                        return this.receive(-1);
307                }
308
309                @Override
310                @Nullable
311                public Message<?> receive(long timeout) {
312                        try {
313                                if (timeout < 0) {
314                                        this.replyLatch.await();
315                                        this.hasReceived = true;
316                                }
317                                else {
318                                        if (this.replyLatch.await(timeout, TimeUnit.MILLISECONDS)) {
319                                                this.hasReceived = true;
320                                        }
321                                        else {
322                                                this.hasTimedOut = true;
323                                        }
324                                }
325                        }
326                        catch (InterruptedException ex) {
327                                Thread.currentThread().interrupt();
328                        }
329                        return this.replyMessage;
330                }
331
332                @Override
333                public boolean send(Message<?> message) {
334                        return this.send(message, -1);
335                }
336
337                @Override
338                public boolean send(Message<?> message, long timeout) {
339                        this.replyMessage = message;
340                        boolean alreadyReceivedReply = this.hasReceived;
341                        this.replyLatch.countDown();
342
343                        String errorDescription = null;
344                        if (this.hasTimedOut) {
345                                errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
346                        }
347                        else if (alreadyReceivedReply) {
348                                errorDescription = "Reply message received but the receiving thread has already received a reply";
349                        }
350                        else if (this.hasSendFailed) {
351                                errorDescription = "Reply message received but the receiving thread has exited due to " +
352                                                "an exception while sending the request message";
353                        }
354
355                        if (errorDescription != null) {
356                                if (logger.isWarnEnabled()) {
357                                        logger.warn(errorDescription + ": " + message);
358                                }
359                                if (this.throwExceptionOnLateReply) {
360                                        throw new MessageDeliveryException(message, errorDescription);
361                                }
362                        }
363
364                        return true;
365                }
366        }
367
368}