001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.core;
018
019import java.util.concurrent.CountDownLatch;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024
025import org.springframework.beans.BeansException;
026import org.springframework.beans.factory.BeanFactory;
027import org.springframework.beans.factory.BeanFactoryAware;
028import org.springframework.messaging.Message;
029import org.springframework.messaging.MessageChannel;
030import org.springframework.messaging.MessageDeliveryException;
031import org.springframework.messaging.MessageHeaders;
032import org.springframework.messaging.PollableChannel;
033import org.springframework.messaging.support.MessageBuilder;
034import org.springframework.messaging.support.MessageHeaderAccessor;
035import org.springframework.util.Assert;
036
037/**
038 * A messaging template that resolves destinations names to {@link MessageChannel}'s
039 * to send and receive messages from.
040 *
041 * @author Mark Fisher
042 * @author Rossen Stoyanchev
043 * @since 4.0
044 */
045public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
046                implements BeanFactoryAware {
047
048        private volatile long sendTimeout = -1;
049
050        private volatile long receiveTimeout = -1;
051
052        private volatile boolean throwExceptionOnLateReply = false;
053
054
055        /**
056         * Configure the timeout value to use for send operations.
057         * @param sendTimeout the send timeout in milliseconds
058         */
059        public void setSendTimeout(long sendTimeout) {
060                this.sendTimeout = sendTimeout;
061        }
062
063        /**
064         * Return the configured send operation timeout value.
065         */
066        public long getSendTimeout() {
067                return this.sendTimeout;
068        }
069
070        /**
071         * Configure the timeout value to use for receive operations.
072         * @param receiveTimeout the receive timeout in milliseconds
073         */
074        public void setReceiveTimeout(long receiveTimeout) {
075                this.receiveTimeout = receiveTimeout;
076        }
077
078        /**
079         * Return the configured receive operation timeout value.
080         */
081        public long getReceiveTimeout() {
082                return this.receiveTimeout;
083        }
084
085        /**
086         * Whether the thread sending a reply should have an exception raised if the
087         * receiving thread isn't going to receive the reply either because it timed out,
088         * or because it already received a reply, or because it got an exception while
089         * sending the request message.
090         * <p>The default value is {@code false} in which case only a WARN message is logged.
091         * If set to {@code true} a {@link MessageDeliveryException} is raised in addition
092         * to the log message.
093         * @param throwExceptionOnLateReply whether to throw an exception or not
094         */
095        public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
096                this.throwExceptionOnLateReply = throwExceptionOnLateReply;
097        }
098
099        @Override
100        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
101                setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
102        }
103
104
105        @Override
106        protected final void doSend(MessageChannel channel, Message<?> message) {
107                Assert.notNull(channel, "MessageChannel is required");
108
109                MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
110                if (accessor != null && accessor.isMutable()) {
111                        accessor.setImmutable();
112                }
113
114                long timeout = this.sendTimeout;
115                boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
116
117                if (!sent) {
118                        throw new MessageDeliveryException(message,
119                                        "Failed to send message to channel '" + channel + "' within timeout: " + timeout);
120                }
121        }
122
123        @Override
124        protected final Message<?> doReceive(MessageChannel channel) {
125                Assert.notNull(channel, "MessageChannel is required");
126                Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
127
128                long timeout = this.receiveTimeout;
129                Message<?> message = (timeout >= 0 ?
130                                ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
131
132                if (message == null && this.logger.isTraceEnabled()) {
133                        this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
134                }
135
136                return message;
137        }
138
139        @Override
140        protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) {
141                Assert.notNull(channel, "'channel' is required");
142                Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
143                Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
144
145                TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
146                requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel).
147                                setErrorChannel(tempReplyChannel).build();
148
149                try {
150                        doSend(channel, requestMessage);
151                }
152                catch (RuntimeException ex) {
153                        tempReplyChannel.setSendFailed(true);
154                        throw ex;
155                }
156
157                Message<?> replyMessage = this.doReceive(tempReplyChannel);
158                if (replyMessage != null) {
159                        replyMessage = MessageBuilder.fromMessage(replyMessage)
160                                        .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
161                                        .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
162                                        .build();
163                }
164
165                return replyMessage;
166        }
167
168
169        /**
170         * A temporary channel for receiving a single reply message.
171         */
172        private class TemporaryReplyChannel implements PollableChannel {
173
174                private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
175
176                private final CountDownLatch replyLatch = new CountDownLatch(1);
177
178                private volatile Message<?> replyMessage;
179
180                private volatile boolean hasReceived;
181
182                private volatile boolean hasTimedOut;
183
184                private volatile boolean hasSendFailed;
185
186                public void setSendFailed(boolean hasSendError) {
187                        this.hasSendFailed = hasSendError;
188                }
189
190                @Override
191                public Message<?> receive() {
192                        return this.receive(-1);
193                }
194
195                @Override
196                public Message<?> receive(long timeout) {
197                        try {
198                                if (GenericMessagingTemplate.this.receiveTimeout < 0) {
199                                        this.replyLatch.await();
200                                        this.hasReceived = true;
201                                }
202                                else {
203                                        if (this.replyLatch.await(GenericMessagingTemplate.this.receiveTimeout, TimeUnit.MILLISECONDS)) {
204                                                this.hasReceived = true;
205                                        }
206                                        else {
207                                                this.hasTimedOut = true;
208                                        }
209                                }
210                        }
211                        catch (InterruptedException ex) {
212                                Thread.currentThread().interrupt();
213                        }
214                        return this.replyMessage;
215                }
216
217                @Override
218                public boolean send(Message<?> message) {
219                        return this.send(message, -1);
220                }
221
222                @Override
223                public boolean send(Message<?> message, long timeout) {
224                        this.replyMessage = message;
225                        boolean alreadyReceivedReply = this.hasReceived;
226                        this.replyLatch.countDown();
227
228                        String errorDescription = null;
229                        if (this.hasTimedOut) {
230                                errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
231                        }
232                        else if (alreadyReceivedReply) {
233                                errorDescription = "Reply message received but the receiving thread has already received a reply";
234                        }
235                        else if (this.hasSendFailed) {
236                                errorDescription = "Reply message received but the receiving thread has exited due to " +
237                                                "an exception while sending the request message";
238                        }
239
240                        if (errorDescription != null) {
241                                if (logger.isWarnEnabled()) {
242                                        logger.warn(errorDescription + ":" + message);
243                                }
244                                if (GenericMessagingTemplate.this.throwExceptionOnLateReply) {
245                                        throw new MessageDeliveryException(message, errorDescription);
246                                }
247                        }
248
249                        return true;
250                }
251        }
252
253}