001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.support; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.beans.factory.BeanNameAware; 027import org.springframework.lang.Nullable; 028import org.springframework.messaging.Message; 029import org.springframework.messaging.MessageChannel; 030import org.springframework.messaging.MessageDeliveryException; 031import org.springframework.messaging.MessagingException; 032import org.springframework.util.Assert; 033import org.springframework.util.ObjectUtils; 034 035/** 036 * Abstract base class for {@link MessageChannel} implementations. 037 * 038 * @author Rossen Stoyanchev 039 * @since 4.0 040 */ 041public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware { 042 043 protected Log logger = LogFactory.getLog(getClass()); 044 045 private String beanName; 046 047 private final List<ChannelInterceptor> interceptors = new ArrayList<>(5); 048 049 050 public AbstractMessageChannel() { 051 this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this); 052 } 053 054 055 /** 056 * Set an alternative logger to use than the one based on the class name. 057 * @param logger the logger to use 058 * @since 5.1 059 */ 060 public void setLogger(Log logger) { 061 this.logger = logger; 062 } 063 064 /** 065 * Return the currently configured Logger. 066 * @since 5.1 067 */ 068 public Log getLogger() { 069 return logger; 070 } 071 072 /** 073 * A message channel uses the bean name primarily for logging purposes. 074 */ 075 @Override 076 public void setBeanName(String name) { 077 this.beanName = name; 078 } 079 080 /** 081 * Return the bean name for this message channel. 082 */ 083 public String getBeanName() { 084 return this.beanName; 085 } 086 087 088 @Override 089 public void setInterceptors(List<ChannelInterceptor> interceptors) { 090 Assert.noNullElements(interceptors, "'interceptors' must not contain null elements"); 091 this.interceptors.clear(); 092 this.interceptors.addAll(interceptors); 093 } 094 095 @Override 096 public void addInterceptor(ChannelInterceptor interceptor) { 097 Assert.notNull(interceptor, "'interceptor' must not be null"); 098 this.interceptors.add(interceptor); 099 } 100 101 @Override 102 public void addInterceptor(int index, ChannelInterceptor interceptor) { 103 Assert.notNull(interceptor, "'interceptor' must not be null"); 104 this.interceptors.add(index, interceptor); 105 } 106 107 @Override 108 public List<ChannelInterceptor> getInterceptors() { 109 return Collections.unmodifiableList(this.interceptors); 110 } 111 112 @Override 113 public boolean removeInterceptor(ChannelInterceptor interceptor) { 114 return this.interceptors.remove(interceptor); 115 } 116 117 @Override 118 public ChannelInterceptor removeInterceptor(int index) { 119 return this.interceptors.remove(index); 120 } 121 122 123 @Override 124 public final boolean send(Message<?> message) { 125 return send(message, INDEFINITE_TIMEOUT); 126 } 127 128 @Override 129 public final boolean send(Message<?> message, long timeout) { 130 Assert.notNull(message, "Message must not be null"); 131 Message<?> messageToUse = message; 132 ChannelInterceptorChain chain = new ChannelInterceptorChain(); 133 boolean sent = false; 134 try { 135 messageToUse = chain.applyPreSend(messageToUse, this); 136 if (messageToUse == null) { 137 return false; 138 } 139 sent = sendInternal(messageToUse, timeout); 140 chain.applyPostSend(messageToUse, this, sent); 141 chain.triggerAfterSendCompletion(messageToUse, this, sent, null); 142 return sent; 143 } 144 catch (Exception ex) { 145 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex); 146 if (ex instanceof MessagingException) { 147 throw (MessagingException) ex; 148 } 149 throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex); 150 } 151 catch (Throwable err) { 152 MessageDeliveryException ex2 = 153 new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err); 154 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2); 155 throw ex2; 156 } 157 } 158 159 protected abstract boolean sendInternal(Message<?> message, long timeout); 160 161 162 @Override 163 public String toString() { 164 return getClass().getSimpleName() + "[" + this.beanName + "]"; 165 } 166 167 168 /** 169 * Assists with the invocation of the configured channel interceptors. 170 */ 171 protected class ChannelInterceptorChain { 172 173 private int sendInterceptorIndex = -1; 174 175 private int receiveInterceptorIndex = -1; 176 177 @Nullable 178 public Message<?> applyPreSend(Message<?> message, MessageChannel channel) { 179 Message<?> messageToUse = message; 180 for (ChannelInterceptor interceptor : interceptors) { 181 Message<?> resolvedMessage = interceptor.preSend(messageToUse, channel); 182 if (resolvedMessage == null) { 183 String name = interceptor.getClass().getSimpleName(); 184 if (logger.isDebugEnabled()) { 185 logger.debug(name + " returned null from preSend, i.e. precluding the send."); 186 } 187 triggerAfterSendCompletion(messageToUse, channel, false, null); 188 return null; 189 } 190 messageToUse = resolvedMessage; 191 this.sendInterceptorIndex++; 192 } 193 return messageToUse; 194 } 195 196 public void applyPostSend(Message<?> message, MessageChannel channel, boolean sent) { 197 for (ChannelInterceptor interceptor : interceptors) { 198 interceptor.postSend(message, channel, sent); 199 } 200 } 201 202 public void triggerAfterSendCompletion(Message<?> message, MessageChannel channel, 203 boolean sent, @Nullable Exception ex) { 204 205 for (int i = this.sendInterceptorIndex; i >= 0; i--) { 206 ChannelInterceptor interceptor = interceptors.get(i); 207 try { 208 interceptor.afterSendCompletion(message, channel, sent, ex); 209 } 210 catch (Throwable ex2) { 211 logger.error("Exception from afterSendCompletion in " + interceptor, ex2); 212 } 213 } 214 } 215 216 public boolean applyPreReceive(MessageChannel channel) { 217 for (ChannelInterceptor interceptor : interceptors) { 218 if (!interceptor.preReceive(channel)) { 219 triggerAfterReceiveCompletion(null, channel, null); 220 return false; 221 } 222 this.receiveInterceptorIndex++; 223 } 224 return true; 225 } 226 227 @Nullable 228 public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) { 229 Message<?> messageToUse = message; 230 for (ChannelInterceptor interceptor : interceptors) { 231 messageToUse = interceptor.postReceive(messageToUse, channel); 232 if (messageToUse == null) { 233 return null; 234 } 235 } 236 return messageToUse; 237 } 238 239 public void triggerAfterReceiveCompletion( 240 @Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) { 241 242 for (int i = this.receiveInterceptorIndex; i >= 0; i--) { 243 ChannelInterceptor interceptor = interceptors.get(i); 244 try { 245 interceptor.afterReceiveCompletion(message, channel, ex); 246 } 247 catch (Throwable ex2) { 248 if (logger.isErrorEnabled()) { 249 logger.error("Exception from afterReceiveCompletion in " + interceptor, ex2); 250 } 251 } 252 } 253 } 254 } 255 256}