001/* 002 * Copyright 2002-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.core; 018 019import java.util.concurrent.CountDownLatch; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024 025import org.springframework.beans.BeansException; 026import org.springframework.beans.factory.BeanFactory; 027import org.springframework.beans.factory.BeanFactoryAware; 028import org.springframework.messaging.Message; 029import org.springframework.messaging.MessageChannel; 030import org.springframework.messaging.MessageDeliveryException; 031import org.springframework.messaging.MessageHeaders; 032import org.springframework.messaging.PollableChannel; 033import org.springframework.messaging.support.MessageBuilder; 034import org.springframework.messaging.support.MessageHeaderAccessor; 035import org.springframework.util.Assert; 036 037/** 038 * A messaging template that resolves destinations names to {@link MessageChannel}'s 039 * to send and receive messages from. 040 * 041 * @author Mark Fisher 042 * @author Rossen Stoyanchev 043 * @since 4.0 044 */ 045public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> 046 implements BeanFactoryAware { 047 048 private volatile long sendTimeout = -1; 049 050 private volatile long receiveTimeout = -1; 051 052 private volatile boolean throwExceptionOnLateReply = false; 053 054 055 /** 056 * Configure the timeout value to use for send operations. 057 * @param sendTimeout the send timeout in milliseconds 058 */ 059 public void setSendTimeout(long sendTimeout) { 060 this.sendTimeout = sendTimeout; 061 } 062 063 /** 064 * Return the configured send operation timeout value. 065 */ 066 public long getSendTimeout() { 067 return this.sendTimeout; 068 } 069 070 /** 071 * Configure the timeout value to use for receive operations. 072 * @param receiveTimeout the receive timeout in milliseconds 073 */ 074 public void setReceiveTimeout(long receiveTimeout) { 075 this.receiveTimeout = receiveTimeout; 076 } 077 078 /** 079 * Return the configured receive operation timeout value. 080 */ 081 public long getReceiveTimeout() { 082 return this.receiveTimeout; 083 } 084 085 /** 086 * Whether the thread sending a reply should have an exception raised if the 087 * receiving thread isn't going to receive the reply either because it timed out, 088 * or because it already received a reply, or because it got an exception while 089 * sending the request message. 090 * <p>The default value is {@code false} in which case only a WARN message is logged. 091 * If set to {@code true} a {@link MessageDeliveryException} is raised in addition 092 * to the log message. 093 * @param throwExceptionOnLateReply whether to throw an exception or not 094 */ 095 public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { 096 this.throwExceptionOnLateReply = throwExceptionOnLateReply; 097 } 098 099 @Override 100 public void setBeanFactory(BeanFactory beanFactory) throws BeansException { 101 setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory)); 102 } 103 104 105 @Override 106 protected final void doSend(MessageChannel channel, Message<?> message) { 107 Assert.notNull(channel, "MessageChannel is required"); 108 109 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 110 if (accessor != null && accessor.isMutable()) { 111 accessor.setImmutable(); 112 } 113 114 long timeout = this.sendTimeout; 115 boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message)); 116 117 if (!sent) { 118 throw new MessageDeliveryException(message, 119 "Failed to send message to channel '" + channel + "' within timeout: " + timeout); 120 } 121 } 122 123 @Override 124 protected final Message<?> doReceive(MessageChannel channel) { 125 Assert.notNull(channel, "MessageChannel is required"); 126 Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages"); 127 128 long timeout = this.receiveTimeout; 129 Message<?> message = (timeout >= 0 ? 130 ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive()); 131 132 if (message == null && this.logger.isTraceEnabled()) { 133 this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout); 134 } 135 136 return message; 137 } 138 139 @Override 140 protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) { 141 Assert.notNull(channel, "'channel' is required"); 142 Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); 143 Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); 144 145 TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(); 146 requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel). 147 setErrorChannel(tempReplyChannel).build(); 148 149 try { 150 doSend(channel, requestMessage); 151 } 152 catch (RuntimeException ex) { 153 tempReplyChannel.setSendFailed(true); 154 throw ex; 155 } 156 157 Message<?> replyMessage = this.doReceive(tempReplyChannel); 158 if (replyMessage != null) { 159 replyMessage = MessageBuilder.fromMessage(replyMessage) 160 .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) 161 .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) 162 .build(); 163 } 164 165 return replyMessage; 166 } 167 168 169 /** 170 * A temporary channel for receiving a single reply message. 171 */ 172 private class TemporaryReplyChannel implements PollableChannel { 173 174 private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); 175 176 private final CountDownLatch replyLatch = new CountDownLatch(1); 177 178 private volatile Message<?> replyMessage; 179 180 private volatile boolean hasReceived; 181 182 private volatile boolean hasTimedOut; 183 184 private volatile boolean hasSendFailed; 185 186 public void setSendFailed(boolean hasSendError) { 187 this.hasSendFailed = hasSendError; 188 } 189 190 @Override 191 public Message<?> receive() { 192 return this.receive(-1); 193 } 194 195 @Override 196 public Message<?> receive(long timeout) { 197 try { 198 if (GenericMessagingTemplate.this.receiveTimeout < 0) { 199 this.replyLatch.await(); 200 this.hasReceived = true; 201 } 202 else { 203 if (this.replyLatch.await(GenericMessagingTemplate.this.receiveTimeout, TimeUnit.MILLISECONDS)) { 204 this.hasReceived = true; 205 } 206 else { 207 this.hasTimedOut = true; 208 } 209 } 210 } 211 catch (InterruptedException ex) { 212 Thread.currentThread().interrupt(); 213 } 214 return this.replyMessage; 215 } 216 217 @Override 218 public boolean send(Message<?> message) { 219 return this.send(message, -1); 220 } 221 222 @Override 223 public boolean send(Message<?> message, long timeout) { 224 this.replyMessage = message; 225 boolean alreadyReceivedReply = this.hasReceived; 226 this.replyLatch.countDown(); 227 228 String errorDescription = null; 229 if (this.hasTimedOut) { 230 errorDescription = "Reply message received but the receiving thread has exited due to a timeout"; 231 } 232 else if (alreadyReceivedReply) { 233 errorDescription = "Reply message received but the receiving thread has already received a reply"; 234 } 235 else if (this.hasSendFailed) { 236 errorDescription = "Reply message received but the receiving thread has exited due to " + 237 "an exception while sending the request message"; 238 } 239 240 if (errorDescription != null) { 241 if (logger.isWarnEnabled()) { 242 logger.warn(errorDescription + ":" + message); 243 } 244 if (GenericMessagingTemplate.this.throwExceptionOnLateReply) { 245 throw new MessageDeliveryException(message, errorDescription); 246 } 247 } 248 249 return true; 250 } 251 } 252 253}