001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.support;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.List;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.beans.factory.BeanNameAware;
027import org.springframework.messaging.Message;
028import org.springframework.messaging.MessageChannel;
029import org.springframework.messaging.MessageDeliveryException;
030import org.springframework.messaging.MessagingException;
031import org.springframework.util.Assert;
032import org.springframework.util.ObjectUtils;
033
034/**
035 * Abstract base class for {@link MessageChannel} implementations.
036 *
037 * @author Rossen Stoyanchev
038 * @since 4.0
039 */
040public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware {
041
042        protected final Log logger = LogFactory.getLog(getClass());
043
044        private String beanName;
045
046        private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>(5);
047
048
049        public AbstractMessageChannel() {
050                this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
051        }
052
053
054        /**
055         * A message channel uses the bean name primarily for logging purposes.
056         */
057        @Override
058        public void setBeanName(String name) {
059                this.beanName = name;
060        }
061
062        /**
063         * Return the bean name for this message channel.
064         */
065        public String getBeanName() {
066                return this.beanName;
067        }
068
069
070        @Override
071        public void setInterceptors(List<ChannelInterceptor> interceptors) {
072                this.interceptors.clear();
073                this.interceptors.addAll(interceptors);
074        }
075
076        @Override
077        public void addInterceptor(ChannelInterceptor interceptor) {
078                Assert.notNull(interceptor, "ChannelInterceptor must not be null");
079                this.interceptors.add(interceptor);
080        }
081
082        @Override
083        public void addInterceptor(int index, ChannelInterceptor interceptor) {
084                Assert.notNull(interceptor, "ChannelInterceptor must not be null");
085                this.interceptors.add(index, interceptor);
086        }
087
088        @Override
089        public List<ChannelInterceptor> getInterceptors() {
090                return Collections.unmodifiableList(this.interceptors);
091        }
092
093        @Override
094        public boolean removeInterceptor(ChannelInterceptor interceptor) {
095                return this.interceptors.remove(interceptor);
096        }
097
098        @Override
099        public ChannelInterceptor removeInterceptor(int index) {
100                return this.interceptors.remove(index);
101        }
102
103
104        @Override
105        public final boolean send(Message<?> message) {
106                return send(message, INDEFINITE_TIMEOUT);
107        }
108
109        @Override
110        public final boolean send(Message<?> message, long timeout) {
111                Assert.notNull(message, "Message must not be null");
112                Message<?> messageToUse = message;
113                ChannelInterceptorChain chain = new ChannelInterceptorChain();
114                boolean sent = false;
115                try {
116                        messageToUse = chain.applyPreSend(messageToUse, this);
117                        if (messageToUse == null) {
118                                return false;
119                        }
120                        sent = sendInternal(messageToUse, timeout);
121                        chain.applyPostSend(messageToUse, this, sent);
122                        chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
123                        return sent;
124                }
125                catch (Exception ex) {
126                        chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
127                        if (ex instanceof MessagingException) {
128                                throw (MessagingException) ex;
129                        }
130                        throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
131                }
132                catch (Throwable err) {
133                        MessageDeliveryException ex2 =
134                                        new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
135                        chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
136                        throw ex2;
137                }
138        }
139
140        protected abstract boolean sendInternal(Message<?> message, long timeout);
141
142
143        @Override
144        public String toString() {
145                return getClass().getSimpleName() + "[" + this.beanName + "]";
146        }
147
148
149        /**
150         * Assists with the invocation of the configured channel interceptors.
151         */
152        protected class ChannelInterceptorChain {
153
154                private int sendInterceptorIndex = -1;
155
156                private int receiveInterceptorIndex = -1;
157
158                public Message<?> applyPreSend(Message<?> message, MessageChannel channel) {
159                        Message<?> messageToUse = message;
160                        for (ChannelInterceptor interceptor : interceptors) {
161                                Message<?> resolvedMessage = interceptor.preSend(messageToUse, channel);
162                                if (resolvedMessage == null) {
163                                        String name = interceptor.getClass().getSimpleName();
164                                        if (logger.isDebugEnabled()) {
165                                                logger.debug(name + " returned null from preSend, i.e. precluding the send.");
166                                        }
167                                        triggerAfterSendCompletion(messageToUse, channel, false, null);
168                                        return null;
169                                }
170                                messageToUse = resolvedMessage;
171                                this.sendInterceptorIndex++;
172                        }
173                        return messageToUse;
174                }
175
176                public void applyPostSend(Message<?> message, MessageChannel channel, boolean sent) {
177                        for (ChannelInterceptor interceptor : interceptors) {
178                                interceptor.postSend(message, channel, sent);
179                        }
180                }
181
182                public void triggerAfterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
183                        for (int i = this.sendInterceptorIndex; i >= 0; i--) {
184                                ChannelInterceptor interceptor = interceptors.get(i);
185                                try {
186                                        interceptor.afterSendCompletion(message, channel, sent, ex);
187                                }
188                                catch (Throwable ex2) {
189                                        logger.error("Exception from afterSendCompletion in " + interceptor, ex2);
190                                }
191                        }
192                }
193
194                public boolean applyPreReceive(MessageChannel channel) {
195                        for (ChannelInterceptor interceptor : interceptors) {
196                                if (!interceptor.preReceive(channel)) {
197                                        triggerAfterReceiveCompletion(null, channel, null);
198                                        return false;
199                                }
200                                this.receiveInterceptorIndex++;
201                        }
202                        return true;
203                }
204
205                public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) {
206                        Message<?> messageToUse = message;
207                        for (ChannelInterceptor interceptor : interceptors) {
208                                messageToUse = interceptor.postReceive(messageToUse, channel);
209                                if (messageToUse == null) {
210                                        return null;
211                                }
212                        }
213                        return messageToUse;
214                }
215
216                public void triggerAfterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
217                        for (int i = this.receiveInterceptorIndex; i >= 0; i--) {
218                                ChannelInterceptor interceptor = interceptors.get(i);
219                                try {
220                                        interceptor.afterReceiveCompletion(message, channel, ex);
221                                }
222                                catch (Throwable ex2) {
223                                        if (logger.isErrorEnabled()) {
224                                                logger.error("Exception from afterReceiveCompletion in " + interceptor, ex2);
225                                        }
226                                }
227                        }
228                }
229        }
230
231}