001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.core; 018 019import java.util.concurrent.CountDownLatch; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024 025import org.springframework.beans.BeansException; 026import org.springframework.beans.factory.BeanFactory; 027import org.springframework.beans.factory.BeanFactoryAware; 028import org.springframework.lang.Nullable; 029import org.springframework.messaging.Message; 030import org.springframework.messaging.MessageChannel; 031import org.springframework.messaging.MessageDeliveryException; 032import org.springframework.messaging.MessageHeaders; 033import org.springframework.messaging.PollableChannel; 034import org.springframework.messaging.support.MessageBuilder; 035import org.springframework.messaging.support.MessageHeaderAccessor; 036import org.springframework.util.Assert; 037 038/** 039 * A messaging template that resolves destinations names to {@link MessageChannel}'s 040 * to send and receive messages from. 041 * 042 * @author Mark Fisher 043 * @author Rossen Stoyanchev 044 * @author Gary Russell 045 * @since 4.0 046 */ 047public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> 048 implements BeanFactoryAware { 049 050 /** 051 * The default header key used for a send timeout. 052 */ 053 public static final String DEFAULT_SEND_TIMEOUT_HEADER = "sendTimeout"; 054 055 /** 056 * The default header key used for a receive timeout. 057 */ 058 public static final String DEFAULT_RECEIVE_TIMEOUT_HEADER = "receiveTimeout"; 059 060 private volatile long sendTimeout = -1; 061 062 private volatile long receiveTimeout = -1; 063 064 private String sendTimeoutHeader = DEFAULT_SEND_TIMEOUT_HEADER; 065 066 private String receiveTimeoutHeader = DEFAULT_RECEIVE_TIMEOUT_HEADER; 067 068 private volatile boolean throwExceptionOnLateReply = false; 069 070 071 /** 072 * Configure the default timeout value to use for send operations. 073 * May be overridden for individual messages. 074 * @param sendTimeout the send timeout in milliseconds 075 * @see #setSendTimeoutHeader(String) 076 */ 077 public void setSendTimeout(long sendTimeout) { 078 this.sendTimeout = sendTimeout; 079 } 080 081 /** 082 * Return the configured default send operation timeout value. 083 */ 084 public long getSendTimeout() { 085 return this.sendTimeout; 086 } 087 088 /** 089 * Configure the default timeout value to use for receive operations. 090 * May be overridden for individual messages when using sendAndReceive 091 * operations. 092 * @param receiveTimeout the receive timeout in milliseconds 093 * @see #setReceiveTimeoutHeader(String) 094 */ 095 public void setReceiveTimeout(long receiveTimeout) { 096 this.receiveTimeout = receiveTimeout; 097 } 098 099 /** 100 * Return the configured receive operation timeout value. 101 */ 102 public long getReceiveTimeout() { 103 return this.receiveTimeout; 104 } 105 106 /** 107 * Set the name of the header used to determine the send timeout (if present). 108 * Default {@value #DEFAULT_SEND_TIMEOUT_HEADER}. 109 * <p>The header is removed before sending the message to avoid propagation. 110 * @since 5.0 111 */ 112 public void setSendTimeoutHeader(String sendTimeoutHeader) { 113 Assert.notNull(sendTimeoutHeader, "'sendTimeoutHeader' cannot be null"); 114 this.sendTimeoutHeader = sendTimeoutHeader; 115 } 116 117 /** 118 * Return the configured send-timeout header. 119 * @since 5.0 120 */ 121 public String getSendTimeoutHeader() { 122 return this.sendTimeoutHeader; 123 } 124 125 /** 126 * Set the name of the header used to determine the send timeout (if present). 127 * Default {@value #DEFAULT_RECEIVE_TIMEOUT_HEADER}. 128 * The header is removed before sending the message to avoid propagation. 129 * @since 5.0 130 */ 131 public void setReceiveTimeoutHeader(String receiveTimeoutHeader) { 132 Assert.notNull(receiveTimeoutHeader, "'receiveTimeoutHeader' cannot be null"); 133 this.receiveTimeoutHeader = receiveTimeoutHeader; 134 } 135 136 /** 137 * Return the configured receive-timeout header. 138 * @since 5.0 139 */ 140 public String getReceiveTimeoutHeader() { 141 return this.receiveTimeoutHeader; 142 } 143 144 /** 145 * Whether the thread sending a reply should have an exception raised if the 146 * receiving thread isn't going to receive the reply either because it timed out, 147 * or because it already received a reply, or because it got an exception while 148 * sending the request message. 149 * <p>The default value is {@code false} in which case only a WARN message is logged. 150 * If set to {@code true} a {@link MessageDeliveryException} is raised in addition 151 * to the log message. 152 * @param throwExceptionOnLateReply whether to throw an exception or not 153 */ 154 public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { 155 this.throwExceptionOnLateReply = throwExceptionOnLateReply; 156 } 157 158 @Override 159 public void setBeanFactory(BeanFactory beanFactory) throws BeansException { 160 setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory)); 161 } 162 163 164 @Override 165 protected final void doSend(MessageChannel channel, Message<?> message) { 166 doSend(channel, message, sendTimeout(message)); 167 } 168 169 protected final void doSend(MessageChannel channel, Message<?> message, long timeout) { 170 Assert.notNull(channel, "MessageChannel is required"); 171 172 Message<?> messageToSend = message; 173 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 174 if (accessor != null && accessor.isMutable()) { 175 accessor.removeHeader(this.sendTimeoutHeader); 176 accessor.removeHeader(this.receiveTimeoutHeader); 177 accessor.setImmutable(); 178 } 179 else if (message.getHeaders().containsKey(this.sendTimeoutHeader) 180 || message.getHeaders().containsKey(this.receiveTimeoutHeader)) { 181 messageToSend = MessageBuilder.fromMessage(message) 182 .setHeader(this.sendTimeoutHeader, null) 183 .setHeader(this.receiveTimeoutHeader, null) 184 .build(); 185 } 186 187 boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend)); 188 189 if (!sent) { 190 throw new MessageDeliveryException(message, 191 "Failed to send message to channel '" + channel + "' within timeout: " + timeout); 192 } 193 } 194 195 @Override 196 @Nullable 197 protected final Message<?> doReceive(MessageChannel channel) { 198 return doReceive(channel, this.receiveTimeout); 199 } 200 201 @Nullable 202 protected final Message<?> doReceive(MessageChannel channel, long timeout) { 203 Assert.notNull(channel, "MessageChannel is required"); 204 Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages"); 205 206 Message<?> message = (timeout >= 0 ? 207 ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive()); 208 209 if (message == null && logger.isTraceEnabled()) { 210 logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout); 211 } 212 213 return message; 214 } 215 216 @Override 217 @Nullable 218 protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) { 219 Assert.notNull(channel, "'channel' is required"); 220 Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); 221 Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); 222 223 long sendTimeout = sendTimeout(requestMessage); 224 long receiveTimeout = receiveTimeout(requestMessage); 225 226 TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(this.throwExceptionOnLateReply); 227 requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel) 228 .setHeader(this.sendTimeoutHeader, null) 229 .setHeader(this.receiveTimeoutHeader, null) 230 .setErrorChannel(tempReplyChannel).build(); 231 232 try { 233 doSend(channel, requestMessage, sendTimeout); 234 } 235 catch (RuntimeException ex) { 236 tempReplyChannel.setSendFailed(true); 237 throw ex; 238 } 239 240 Message<?> replyMessage = this.doReceive(tempReplyChannel, receiveTimeout); 241 if (replyMessage != null) { 242 replyMessage = MessageBuilder.fromMessage(replyMessage) 243 .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) 244 .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) 245 .build(); 246 } 247 248 return replyMessage; 249 } 250 251 private long sendTimeout(Message<?> requestMessage) { 252 Long sendTimeout = headerToLong(requestMessage.getHeaders().get(this.sendTimeoutHeader)); 253 return (sendTimeout != null ? sendTimeout : this.sendTimeout); 254 } 255 256 private long receiveTimeout(Message<?> requestMessage) { 257 Long receiveTimeout = headerToLong(requestMessage.getHeaders().get(this.receiveTimeoutHeader)); 258 return (receiveTimeout != null ? receiveTimeout : this.receiveTimeout); 259 } 260 261 @Nullable 262 private Long headerToLong(@Nullable Object headerValue) { 263 if (headerValue instanceof Number) { 264 return ((Number) headerValue).longValue(); 265 } 266 else if (headerValue instanceof String) { 267 return Long.parseLong((String) headerValue); 268 } 269 else { 270 return null; 271 } 272 } 273 274 275 /** 276 * A temporary channel for receiving a single reply message. 277 */ 278 private static final class TemporaryReplyChannel implements PollableChannel { 279 280 private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); 281 282 private final CountDownLatch replyLatch = new CountDownLatch(1); 283 284 private final boolean throwExceptionOnLateReply; 285 286 @Nullable 287 private volatile Message<?> replyMessage; 288 289 private volatile boolean hasReceived; 290 291 private volatile boolean hasTimedOut; 292 293 private volatile boolean hasSendFailed; 294 295 TemporaryReplyChannel(boolean throwExceptionOnLateReply) { 296 this.throwExceptionOnLateReply = throwExceptionOnLateReply; 297 } 298 299 public void setSendFailed(boolean hasSendError) { 300 this.hasSendFailed = hasSendError; 301 } 302 303 @Override 304 @Nullable 305 public Message<?> receive() { 306 return this.receive(-1); 307 } 308 309 @Override 310 @Nullable 311 public Message<?> receive(long timeout) { 312 try { 313 if (timeout < 0) { 314 this.replyLatch.await(); 315 this.hasReceived = true; 316 } 317 else { 318 if (this.replyLatch.await(timeout, TimeUnit.MILLISECONDS)) { 319 this.hasReceived = true; 320 } 321 else { 322 this.hasTimedOut = true; 323 } 324 } 325 } 326 catch (InterruptedException ex) { 327 Thread.currentThread().interrupt(); 328 } 329 return this.replyMessage; 330 } 331 332 @Override 333 public boolean send(Message<?> message) { 334 return this.send(message, -1); 335 } 336 337 @Override 338 public boolean send(Message<?> message, long timeout) { 339 this.replyMessage = message; 340 boolean alreadyReceivedReply = this.hasReceived; 341 this.replyLatch.countDown(); 342 343 String errorDescription = null; 344 if (this.hasTimedOut) { 345 errorDescription = "Reply message received but the receiving thread has exited due to a timeout"; 346 } 347 else if (alreadyReceivedReply) { 348 errorDescription = "Reply message received but the receiving thread has already received a reply"; 349 } 350 else if (this.hasSendFailed) { 351 errorDescription = "Reply message received but the receiving thread has exited due to " + 352 "an exception while sending the request message"; 353 } 354 355 if (errorDescription != null) { 356 if (logger.isWarnEnabled()) { 357 logger.warn(errorDescription + ": " + message); 358 } 359 if (this.throwExceptionOnLateReply) { 360 throw new MessageDeliveryException(message, errorDescription); 361 } 362 } 363 364 return true; 365 } 366 } 367 368}