001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.support;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.List;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.beans.factory.BeanNameAware;
027import org.springframework.lang.Nullable;
028import org.springframework.messaging.Message;
029import org.springframework.messaging.MessageChannel;
030import org.springframework.messaging.MessageDeliveryException;
031import org.springframework.messaging.MessagingException;
032import org.springframework.util.Assert;
033import org.springframework.util.ObjectUtils;
034
035/**
036 * Abstract base class for {@link MessageChannel} implementations.
037 *
038 * @author Rossen Stoyanchev
039 * @since 4.0
040 */
041public abstract class AbstractMessageChannel implements MessageChannel, InterceptableChannel, BeanNameAware {
042
043        protected Log logger = LogFactory.getLog(getClass());
044
045        private String beanName;
046
047        private final List<ChannelInterceptor> interceptors = new ArrayList<>(5);
048
049
050        public AbstractMessageChannel() {
051                this.beanName = getClass().getSimpleName() + "@" + ObjectUtils.getIdentityHexString(this);
052        }
053
054
055        /**
056         * Set an alternative logger to use than the one based on the class name.
057         * @param logger the logger to use
058         * @since 5.1
059         */
060        public void setLogger(Log logger) {
061                this.logger = logger;
062        }
063
064        /**
065         * Return the currently configured Logger.
066         * @since 5.1
067         */
068        public Log getLogger() {
069                return logger;
070        }
071
072        /**
073         * A message channel uses the bean name primarily for logging purposes.
074         */
075        @Override
076        public void setBeanName(String name) {
077                this.beanName = name;
078        }
079
080        /**
081         * Return the bean name for this message channel.
082         */
083        public String getBeanName() {
084                return this.beanName;
085        }
086
087
088        @Override
089        public void setInterceptors(List<ChannelInterceptor> interceptors) {
090                Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
091                this.interceptors.clear();
092                this.interceptors.addAll(interceptors);
093        }
094
095        @Override
096        public void addInterceptor(ChannelInterceptor interceptor) {
097                Assert.notNull(interceptor, "'interceptor' must not be null");
098                this.interceptors.add(interceptor);
099        }
100
101        @Override
102        public void addInterceptor(int index, ChannelInterceptor interceptor) {
103                Assert.notNull(interceptor, "'interceptor' must not be null");
104                this.interceptors.add(index, interceptor);
105        }
106
107        @Override
108        public List<ChannelInterceptor> getInterceptors() {
109                return Collections.unmodifiableList(this.interceptors);
110        }
111
112        @Override
113        public boolean removeInterceptor(ChannelInterceptor interceptor) {
114                return this.interceptors.remove(interceptor);
115        }
116
117        @Override
118        public ChannelInterceptor removeInterceptor(int index) {
119                return this.interceptors.remove(index);
120        }
121
122
123        @Override
124        public final boolean send(Message<?> message) {
125                return send(message, INDEFINITE_TIMEOUT);
126        }
127
128        @Override
129        public final boolean send(Message<?> message, long timeout) {
130                Assert.notNull(message, "Message must not be null");
131                Message<?> messageToUse = message;
132                ChannelInterceptorChain chain = new ChannelInterceptorChain();
133                boolean sent = false;
134                try {
135                        messageToUse = chain.applyPreSend(messageToUse, this);
136                        if (messageToUse == null) {
137                                return false;
138                        }
139                        sent = sendInternal(messageToUse, timeout);
140                        chain.applyPostSend(messageToUse, this, sent);
141                        chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
142                        return sent;
143                }
144                catch (Exception ex) {
145                        chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
146                        if (ex instanceof MessagingException) {
147                                throw (MessagingException) ex;
148                        }
149                        throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
150                }
151                catch (Throwable err) {
152                        MessageDeliveryException ex2 =
153                                        new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
154                        chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
155                        throw ex2;
156                }
157        }
158
159        protected abstract boolean sendInternal(Message<?> message, long timeout);
160
161
162        @Override
163        public String toString() {
164                return getClass().getSimpleName() + "[" + this.beanName + "]";
165        }
166
167
168        /**
169         * Assists with the invocation of the configured channel interceptors.
170         */
171        protected class ChannelInterceptorChain {
172
173                private int sendInterceptorIndex = -1;
174
175                private int receiveInterceptorIndex = -1;
176
177                @Nullable
178                public Message<?> applyPreSend(Message<?> message, MessageChannel channel) {
179                        Message<?> messageToUse = message;
180                        for (ChannelInterceptor interceptor : interceptors) {
181                                Message<?> resolvedMessage = interceptor.preSend(messageToUse, channel);
182                                if (resolvedMessage == null) {
183                                        String name = interceptor.getClass().getSimpleName();
184                                        if (logger.isDebugEnabled()) {
185                                                logger.debug(name + " returned null from preSend, i.e. precluding the send.");
186                                        }
187                                        triggerAfterSendCompletion(messageToUse, channel, false, null);
188                                        return null;
189                                }
190                                messageToUse = resolvedMessage;
191                                this.sendInterceptorIndex++;
192                        }
193                        return messageToUse;
194                }
195
196                public void applyPostSend(Message<?> message, MessageChannel channel, boolean sent) {
197                        for (ChannelInterceptor interceptor : interceptors) {
198                                interceptor.postSend(message, channel, sent);
199                        }
200                }
201
202                public void triggerAfterSendCompletion(Message<?> message, MessageChannel channel,
203                                boolean sent, @Nullable Exception ex) {
204
205                        for (int i = this.sendInterceptorIndex; i >= 0; i--) {
206                                ChannelInterceptor interceptor = interceptors.get(i);
207                                try {
208                                        interceptor.afterSendCompletion(message, channel, sent, ex);
209                                }
210                                catch (Throwable ex2) {
211                                        logger.error("Exception from afterSendCompletion in " + interceptor, ex2);
212                                }
213                        }
214                }
215
216                public boolean applyPreReceive(MessageChannel channel) {
217                        for (ChannelInterceptor interceptor : interceptors) {
218                                if (!interceptor.preReceive(channel)) {
219                                        triggerAfterReceiveCompletion(null, channel, null);
220                                        return false;
221                                }
222                                this.receiveInterceptorIndex++;
223                        }
224                        return true;
225                }
226
227                @Nullable
228                public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) {
229                        Message<?> messageToUse = message;
230                        for (ChannelInterceptor interceptor : interceptors) {
231                                messageToUse = interceptor.postReceive(messageToUse, channel);
232                                if (messageToUse == null) {
233                                        return null;
234                                }
235                        }
236                        return messageToUse;
237                }
238
239                public void triggerAfterReceiveCompletion(
240                                @Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
241
242                        for (int i = this.receiveInterceptorIndex; i >= 0; i--) {
243                                ChannelInterceptor interceptor = interceptors.get(i);
244                                try {
245                                        interceptor.afterReceiveCompletion(message, channel, ex);
246                                }
247                                catch (Throwable ex2) {
248                                        if (logger.isErrorEnabled()) {
249                                                logger.error("Exception from afterReceiveCompletion in " + interceptor, ex2);
250                                        }
251                                }
252                        }
253                }
254        }
255
256}