001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.support; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.Executor; 022 023import org.springframework.lang.Nullable; 024import org.springframework.messaging.Message; 025import org.springframework.messaging.MessageDeliveryException; 026import org.springframework.messaging.MessageHandler; 027import org.springframework.messaging.MessagingException; 028import org.springframework.messaging.SubscribableChannel; 029 030/** 031 * A {@link SubscribableChannel} that sends messages to each of its subscribers. 032 * 033 * @author Phillip Webb 034 * @author Rossen Stoyanchev 035 * @since 4.0 036 */ 037public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { 038 039 @Nullable 040 private final Executor executor; 041 042 private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<>(4); 043 044 045 /** 046 * Create a new {@link ExecutorSubscribableChannel} instance 047 * where messages will be sent in the callers thread. 048 */ 049 public ExecutorSubscribableChannel() { 050 this(null); 051 } 052 053 /** 054 * Create a new {@link ExecutorSubscribableChannel} instance 055 * where messages will be sent via the specified executor. 056 * @param executor the executor used to send the message, 057 * or {@code null} to execute in the callers thread. 058 */ 059 public ExecutorSubscribableChannel(@Nullable Executor executor) { 060 this.executor = executor; 061 } 062 063 064 @Nullable 065 public Executor getExecutor() { 066 return this.executor; 067 } 068 069 @Override 070 public void setInterceptors(List<ChannelInterceptor> interceptors) { 071 super.setInterceptors(interceptors); 072 this.executorInterceptors.clear(); 073 interceptors.forEach(this::updateExecutorInterceptorsFor); 074 } 075 076 @Override 077 public void addInterceptor(ChannelInterceptor interceptor) { 078 super.addInterceptor(interceptor); 079 updateExecutorInterceptorsFor(interceptor); 080 } 081 082 @Override 083 public void addInterceptor(int index, ChannelInterceptor interceptor) { 084 super.addInterceptor(index, interceptor); 085 updateExecutorInterceptorsFor(interceptor); 086 } 087 088 private void updateExecutorInterceptorsFor(ChannelInterceptor interceptor) { 089 if (interceptor instanceof ExecutorChannelInterceptor) { 090 this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor); 091 } 092 } 093 094 095 @Override 096 public boolean sendInternal(Message<?> message, long timeout) { 097 for (MessageHandler handler : getSubscribers()) { 098 SendTask sendTask = new SendTask(message, handler); 099 if (this.executor == null) { 100 sendTask.run(); 101 } 102 else { 103 this.executor.execute(sendTask); 104 } 105 } 106 return true; 107 } 108 109 110 /** 111 * Invoke a MessageHandler with ExecutorChannelInterceptors. 112 */ 113 private class SendTask implements MessageHandlingRunnable { 114 115 private final Message<?> inputMessage; 116 117 private final MessageHandler messageHandler; 118 119 private int interceptorIndex = -1; 120 121 public SendTask(Message<?> message, MessageHandler messageHandler) { 122 this.inputMessage = message; 123 this.messageHandler = messageHandler; 124 } 125 126 @Override 127 public Message<?> getMessage() { 128 return this.inputMessage; 129 } 130 131 @Override 132 public MessageHandler getMessageHandler() { 133 return this.messageHandler; 134 } 135 136 @Override 137 public void run() { 138 Message<?> message = this.inputMessage; 139 try { 140 message = applyBeforeHandle(message); 141 if (message == null) { 142 return; 143 } 144 this.messageHandler.handleMessage(message); 145 triggerAfterMessageHandled(message, null); 146 } 147 catch (Exception ex) { 148 triggerAfterMessageHandled(message, ex); 149 if (ex instanceof MessagingException) { 150 throw (MessagingException) ex; 151 } 152 String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler; 153 throw new MessageDeliveryException(message, description, ex); 154 } 155 catch (Throwable err) { 156 String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler; 157 MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err); 158 triggerAfterMessageHandled(message, ex2); 159 throw ex2; 160 } 161 } 162 163 @Nullable 164 private Message<?> applyBeforeHandle(Message<?> message) { 165 Message<?> messageToUse = message; 166 for (ExecutorChannelInterceptor interceptor : executorInterceptors) { 167 messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler); 168 if (messageToUse == null) { 169 String name = interceptor.getClass().getSimpleName(); 170 if (logger.isDebugEnabled()) { 171 logger.debug(name + " returned null from beforeHandle, i.e. precluding the send."); 172 } 173 triggerAfterMessageHandled(message, null); 174 return null; 175 } 176 this.interceptorIndex++; 177 } 178 return messageToUse; 179 } 180 181 private void triggerAfterMessageHandled(Message<?> message, @Nullable Exception ex) { 182 for (int i = this.interceptorIndex; i >= 0; i--) { 183 ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); 184 try { 185 interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.messageHandler, ex); 186 } 187 catch (Throwable ex2) { 188 logger.error("Exception from afterMessageHandled in " + interceptor, ex2); 189 } 190 } 191 } 192 } 193 194}