001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.support;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.Executor;
022
023import org.springframework.messaging.Message;
024import org.springframework.messaging.MessageDeliveryException;
025import org.springframework.messaging.MessageHandler;
026import org.springframework.messaging.MessagingException;
027import org.springframework.messaging.SubscribableChannel;
028
029/**
030 * A {@link SubscribableChannel} that sends messages to each of its subscribers.
031 *
032 * @author Phillip Webb
033 * @author Rossen Stoyanchev
034 * @since 4.0
035 */
036public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
037
038        private final Executor executor;
039
040        private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<ExecutorChannelInterceptor>(4);
041
042
043        /**
044         * Create a new {@link ExecutorSubscribableChannel} instance
045         * where messages will be sent in the callers thread.
046         */
047        public ExecutorSubscribableChannel() {
048                this(null);
049        }
050
051        /**
052         * Create a new {@link ExecutorSubscribableChannel} instance
053         * where messages will be sent via the specified executor.
054         * @param executor the executor used to send the message,
055         * or {@code null} to execute in the callers thread.
056         */
057        public ExecutorSubscribableChannel(Executor executor) {
058                this.executor = executor;
059        }
060
061
062        public Executor getExecutor() {
063                return this.executor;
064        }
065
066        @Override
067        public void setInterceptors(List<ChannelInterceptor> interceptors) {
068                super.setInterceptors(interceptors);
069                this.executorInterceptors.clear();
070                for (ChannelInterceptor interceptor : interceptors) {
071                        if (interceptor instanceof ExecutorChannelInterceptor) {
072                                this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
073                        }
074                }
075        }
076
077        @Override
078        public void addInterceptor(ChannelInterceptor interceptor) {
079                super.addInterceptor(interceptor);
080                if (interceptor instanceof ExecutorChannelInterceptor) {
081                        this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
082                }
083        }
084
085
086        @Override
087        public boolean sendInternal(Message<?> message, long timeout) {
088                for (MessageHandler handler : getSubscribers()) {
089                        SendTask sendTask = new SendTask(message, handler);
090                        if (this.executor == null) {
091                                sendTask.run();
092                        }
093                        else {
094                                this.executor.execute(sendTask);
095                        }
096                }
097                return true;
098        }
099
100
101        /**
102         * Invoke a MessageHandler with ExecutorChannelInterceptors.
103         */
104        private class SendTask implements MessageHandlingRunnable {
105
106                private final Message<?> inputMessage;
107
108                private final MessageHandler messageHandler;
109
110                private int interceptorIndex = -1;
111
112                public SendTask(Message<?> message, MessageHandler messageHandler) {
113                        this.inputMessage = message;
114                        this.messageHandler = messageHandler;
115                }
116
117                @Override
118                public Message<?> getMessage() {
119                        return this.inputMessage;
120                }
121
122                @Override
123                public MessageHandler getMessageHandler() {
124                        return this.messageHandler;
125                }
126
127                @Override
128                public void run() {
129                        Message<?> message = this.inputMessage;
130                        try {
131                                message = applyBeforeHandle(message);
132                                if (message == null) {
133                                        return;
134                                }
135                                this.messageHandler.handleMessage(message);
136                                triggerAfterMessageHandled(message, null);
137                        }
138                        catch (Exception ex) {
139                                triggerAfterMessageHandled(message, ex);
140                                if (ex instanceof MessagingException) {
141                                        throw (MessagingException) ex;
142                                }
143                                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
144                                throw new MessageDeliveryException(message, description, ex);
145                        }
146                        catch (Throwable err) {
147                                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
148                                MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
149                                triggerAfterMessageHandled(message, ex2);
150                                throw ex2;
151                        }
152                }
153
154                private Message<?> applyBeforeHandle(Message<?> message) {
155                        Message<?> messageToUse = message;
156                        for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
157                                messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler);
158                                if (messageToUse == null) {
159                                        String name = interceptor.getClass().getSimpleName();
160                                        if (logger.isDebugEnabled()) {
161                                                logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
162                                        }
163                                        triggerAfterMessageHandled(message, null);
164                                        return null;
165                                }
166                                this.interceptorIndex++;
167                        }
168                        return messageToUse;
169                }
170
171                private void triggerAfterMessageHandled(Message<?> message, Exception ex) {
172                        for (int i = this.interceptorIndex; i >= 0; i--) {
173                                ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
174                                try {
175                                        interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.messageHandler, ex);
176                                }
177                                catch (Throwable ex2) {
178                                        logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
179                                }
180                        }
181                }
182        }
183
184}