001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.support;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.Executor;
022
023import org.springframework.lang.Nullable;
024import org.springframework.messaging.Message;
025import org.springframework.messaging.MessageDeliveryException;
026import org.springframework.messaging.MessageHandler;
027import org.springframework.messaging.MessagingException;
028import org.springframework.messaging.SubscribableChannel;
029
030/**
031 * A {@link SubscribableChannel} that sends messages to each of its subscribers.
032 *
033 * @author Phillip Webb
034 * @author Rossen Stoyanchev
035 * @since 4.0
036 */
037public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
038
039        @Nullable
040        private final Executor executor;
041
042        private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<>(4);
043
044
045        /**
046         * Create a new {@link ExecutorSubscribableChannel} instance
047         * where messages will be sent in the callers thread.
048         */
049        public ExecutorSubscribableChannel() {
050                this(null);
051        }
052
053        /**
054         * Create a new {@link ExecutorSubscribableChannel} instance
055         * where messages will be sent via the specified executor.
056         * @param executor the executor used to send the message,
057         * or {@code null} to execute in the callers thread.
058         */
059        public ExecutorSubscribableChannel(@Nullable Executor executor) {
060                this.executor = executor;
061        }
062
063
064        @Nullable
065        public Executor getExecutor() {
066                return this.executor;
067        }
068
069        @Override
070        public void setInterceptors(List<ChannelInterceptor> interceptors) {
071                super.setInterceptors(interceptors);
072                this.executorInterceptors.clear();
073                interceptors.forEach(this::updateExecutorInterceptorsFor);
074        }
075
076        @Override
077        public void addInterceptor(ChannelInterceptor interceptor) {
078                super.addInterceptor(interceptor);
079                updateExecutorInterceptorsFor(interceptor);
080        }
081
082        @Override
083        public void addInterceptor(int index, ChannelInterceptor interceptor) {
084                super.addInterceptor(index, interceptor);
085                updateExecutorInterceptorsFor(interceptor);
086        }
087
088        private void updateExecutorInterceptorsFor(ChannelInterceptor interceptor) {
089                if (interceptor instanceof ExecutorChannelInterceptor) {
090                        this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
091                }
092        }
093
094
095        @Override
096        public boolean sendInternal(Message<?> message, long timeout) {
097                for (MessageHandler handler : getSubscribers()) {
098                        SendTask sendTask = new SendTask(message, handler);
099                        if (this.executor == null) {
100                                sendTask.run();
101                        }
102                        else {
103                                this.executor.execute(sendTask);
104                        }
105                }
106                return true;
107        }
108
109
110        /**
111         * Invoke a MessageHandler with ExecutorChannelInterceptors.
112         */
113        private class SendTask implements MessageHandlingRunnable {
114
115                private final Message<?> inputMessage;
116
117                private final MessageHandler messageHandler;
118
119                private int interceptorIndex = -1;
120
121                public SendTask(Message<?> message, MessageHandler messageHandler) {
122                        this.inputMessage = message;
123                        this.messageHandler = messageHandler;
124                }
125
126                @Override
127                public Message<?> getMessage() {
128                        return this.inputMessage;
129                }
130
131                @Override
132                public MessageHandler getMessageHandler() {
133                        return this.messageHandler;
134                }
135
136                @Override
137                public void run() {
138                        Message<?> message = this.inputMessage;
139                        try {
140                                message = applyBeforeHandle(message);
141                                if (message == null) {
142                                        return;
143                                }
144                                this.messageHandler.handleMessage(message);
145                                triggerAfterMessageHandled(message, null);
146                        }
147                        catch (Exception ex) {
148                                triggerAfterMessageHandled(message, ex);
149                                if (ex instanceof MessagingException) {
150                                        throw (MessagingException) ex;
151                                }
152                                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
153                                throw new MessageDeliveryException(message, description, ex);
154                        }
155                        catch (Throwable err) {
156                                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
157                                MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
158                                triggerAfterMessageHandled(message, ex2);
159                                throw ex2;
160                        }
161                }
162
163                @Nullable
164                private Message<?> applyBeforeHandle(Message<?> message) {
165                        Message<?> messageToUse = message;
166                        for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
167                                messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler);
168                                if (messageToUse == null) {
169                                        String name = interceptor.getClass().getSimpleName();
170                                        if (logger.isDebugEnabled()) {
171                                                logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
172                                        }
173                                        triggerAfterMessageHandled(message, null);
174                                        return null;
175                                }
176                                this.interceptorIndex++;
177                        }
178                        return messageToUse;
179                }
180
181                private void triggerAfterMessageHandled(Message<?> message, @Nullable Exception ex) {
182                        for (int i = this.interceptorIndex; i >= 0; i--) {
183                                ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
184                                try {
185                                        interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.messageHandler, ex);
186                                }
187                                catch (Throwable ex2) {
188                                        logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
189                                }
190                        }
191                }
192        }
193
194}