001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.support; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.beans.factory.BeanNameAware; 027import org.springframework.messaging.Message; 028import org.springframework.messaging.MessageChannel; 029import org.springframework.messaging.MessageDeliveryException; 030import org.springframework.messaging.MessagingException; 031import org.springframework.util.Assert; 032import org.springframework.util.ObjectUtils; 033 034/** 035 * Abstract base class for {@link MessageChannel} implementations. 036 * 037 * @author Rossen Stoyanchev 038 * @since 4.0 039 */ 040public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware { 041 042 protected final Log logger = LogFactory.getLog(getClass()); 043 044 private String beanName; 045 046 private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>(5); 047 048 049 public AbstractMessageChannel() { 050 this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this); 051 } 052 053 054 /** 055 * A message channel uses the bean name primarily for logging purposes. 056 */ 057 @Override 058 public void setBeanName(String name) { 059 this.beanName = name; 060 } 061 062 /** 063 * Return the bean name for this message channel. 064 */ 065 public String getBeanName() { 066 return this.beanName; 067 } 068 069 070 @Override 071 public void setInterceptors(List<ChannelInterceptor> interceptors) { 072 this.interceptors.clear(); 073 this.interceptors.addAll(interceptors); 074 } 075 076 @Override 077 public void addInterceptor(ChannelInterceptor interceptor) { 078 Assert.notNull(interceptor, "ChannelInterceptor must not be null"); 079 this.interceptors.add(interceptor); 080 } 081 082 @Override 083 public void addInterceptor(int index, ChannelInterceptor interceptor) { 084 Assert.notNull(interceptor, "ChannelInterceptor must not be null"); 085 this.interceptors.add(index, interceptor); 086 } 087 088 @Override 089 public List<ChannelInterceptor> getInterceptors() { 090 return Collections.unmodifiableList(this.interceptors); 091 } 092 093 @Override 094 public boolean removeInterceptor(ChannelInterceptor interceptor) { 095 return this.interceptors.remove(interceptor); 096 } 097 098 @Override 099 public ChannelInterceptor removeInterceptor(int index) { 100 return this.interceptors.remove(index); 101 } 102 103 104 @Override 105 public final boolean send(Message<?> message) { 106 return send(message, INDEFINITE_TIMEOUT); 107 } 108 109 @Override 110 public final boolean send(Message<?> message, long timeout) { 111 Assert.notNull(message, "Message must not be null"); 112 Message<?> messageToUse = message; 113 ChannelInterceptorChain chain = new ChannelInterceptorChain(); 114 boolean sent = false; 115 try { 116 messageToUse = chain.applyPreSend(messageToUse, this); 117 if (messageToUse == null) { 118 return false; 119 } 120 sent = sendInternal(messageToUse, timeout); 121 chain.applyPostSend(messageToUse, this, sent); 122 chain.triggerAfterSendCompletion(messageToUse, this, sent, null); 123 return sent; 124 } 125 catch (Exception ex) { 126 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex); 127 if (ex instanceof MessagingException) { 128 throw (MessagingException) ex; 129 } 130 throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex); 131 } 132 catch (Throwable err) { 133 MessageDeliveryException ex2 = 134 new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err); 135 chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2); 136 throw ex2; 137 } 138 } 139 140 protected abstract boolean sendInternal(Message<?> message, long timeout); 141 142 143 @Override 144 public String toString() { 145 return getClass().getSimpleName() + "[" + this.beanName + "]"; 146 } 147 148 149 /** 150 * Assists with the invocation of the configured channel interceptors. 151 */ 152 protected class ChannelInterceptorChain { 153 154 private int sendInterceptorIndex = -1; 155 156 private int receiveInterceptorIndex = -1; 157 158 public Message<?> applyPreSend(Message<?> message, MessageChannel channel) { 159 Message<?> messageToUse = message; 160 for (ChannelInterceptor interceptor : interceptors) { 161 Message<?> resolvedMessage = interceptor.preSend(messageToUse, channel); 162 if (resolvedMessage == null) { 163 String name = interceptor.getClass().getSimpleName(); 164 if (logger.isDebugEnabled()) { 165 logger.debug(name + " returned null from preSend, i.e. precluding the send."); 166 } 167 triggerAfterSendCompletion(messageToUse, channel, false, null); 168 return null; 169 } 170 messageToUse = resolvedMessage; 171 this.sendInterceptorIndex++; 172 } 173 return messageToUse; 174 } 175 176 public void applyPostSend(Message<?> message, MessageChannel channel, boolean sent) { 177 for (ChannelInterceptor interceptor : interceptors) { 178 interceptor.postSend(message, channel, sent); 179 } 180 } 181 182 public void triggerAfterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { 183 for (int i = this.sendInterceptorIndex; i >= 0; i--) { 184 ChannelInterceptor interceptor = interceptors.get(i); 185 try { 186 interceptor.afterSendCompletion(message, channel, sent, ex); 187 } 188 catch (Throwable ex2) { 189 logger.error("Exception from afterSendCompletion in " + interceptor, ex2); 190 } 191 } 192 } 193 194 public boolean applyPreReceive(MessageChannel channel) { 195 for (ChannelInterceptor interceptor : interceptors) { 196 if (!interceptor.preReceive(channel)) { 197 triggerAfterReceiveCompletion(null, channel, null); 198 return false; 199 } 200 this.receiveInterceptorIndex++; 201 } 202 return true; 203 } 204 205 public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) { 206 Message<?> messageToUse = message; 207 for (ChannelInterceptor interceptor : interceptors) { 208 messageToUse = interceptor.postReceive(messageToUse, channel); 209 if (messageToUse == null) { 210 return null; 211 } 212 } 213 return messageToUse; 214 } 215 216 public void triggerAfterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { 217 for (int i = this.receiveInterceptorIndex; i >= 0; i--) { 218 ChannelInterceptor interceptor = interceptors.get(i); 219 try { 220 interceptor.afterReceiveCompletion(message, channel, ex); 221 } 222 catch (Throwable ex2) { 223 if (logger.isErrorEnabled()) { 224 logger.error("Exception from afterReceiveCompletion in " + interceptor, ex2); 225 } 226 } 227 } 228 } 229 } 230 231}