001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.support; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.Executor; 022 023import org.springframework.messaging.Message; 024import org.springframework.messaging.MessageDeliveryException; 025import org.springframework.messaging.MessageHandler; 026import org.springframework.messaging.MessagingException; 027import org.springframework.messaging.SubscribableChannel; 028 029/** 030 * A {@link SubscribableChannel} that sends messages to each of its subscribers. 031 * 032 * @author Phillip Webb 033 * @author Rossen Stoyanchev 034 * @since 4.0 035 */ 036public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { 037 038 private final Executor executor; 039 040 private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<ExecutorChannelInterceptor>(4); 041 042 043 /** 044 * Create a new {@link ExecutorSubscribableChannel} instance 045 * where messages will be sent in the callers thread. 046 */ 047 public ExecutorSubscribableChannel() { 048 this(null); 049 } 050 051 /** 052 * Create a new {@link ExecutorSubscribableChannel} instance 053 * where messages will be sent via the specified executor. 054 * @param executor the executor used to send the message, 055 * or {@code null} to execute in the callers thread. 056 */ 057 public ExecutorSubscribableChannel(Executor executor) { 058 this.executor = executor; 059 } 060 061 062 public Executor getExecutor() { 063 return this.executor; 064 } 065 066 @Override 067 public void setInterceptors(List<ChannelInterceptor> interceptors) { 068 super.setInterceptors(interceptors); 069 this.executorInterceptors.clear(); 070 for (ChannelInterceptor interceptor : interceptors) { 071 if (interceptor instanceof ExecutorChannelInterceptor) { 072 this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor); 073 } 074 } 075 } 076 077 @Override 078 public void addInterceptor(ChannelInterceptor interceptor) { 079 super.addInterceptor(interceptor); 080 if (interceptor instanceof ExecutorChannelInterceptor) { 081 this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor); 082 } 083 } 084 085 086 @Override 087 public boolean sendInternal(Message<?> message, long timeout) { 088 for (MessageHandler handler : getSubscribers()) { 089 SendTask sendTask = new SendTask(message, handler); 090 if (this.executor == null) { 091 sendTask.run(); 092 } 093 else { 094 this.executor.execute(sendTask); 095 } 096 } 097 return true; 098 } 099 100 101 /** 102 * Invoke a MessageHandler with ExecutorChannelInterceptors. 103 */ 104 private class SendTask implements MessageHandlingRunnable { 105 106 private final Message<?> inputMessage; 107 108 private final MessageHandler messageHandler; 109 110 private int interceptorIndex = -1; 111 112 public SendTask(Message<?> message, MessageHandler messageHandler) { 113 this.inputMessage = message; 114 this.messageHandler = messageHandler; 115 } 116 117 @Override 118 public Message<?> getMessage() { 119 return this.inputMessage; 120 } 121 122 @Override 123 public MessageHandler getMessageHandler() { 124 return this.messageHandler; 125 } 126 127 @Override 128 public void run() { 129 Message<?> message = this.inputMessage; 130 try { 131 message = applyBeforeHandle(message); 132 if (message == null) { 133 return; 134 } 135 this.messageHandler.handleMessage(message); 136 triggerAfterMessageHandled(message, null); 137 } 138 catch (Exception ex) { 139 triggerAfterMessageHandled(message, ex); 140 if (ex instanceof MessagingException) { 141 throw (MessagingException) ex; 142 } 143 String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler; 144 throw new MessageDeliveryException(message, description, ex); 145 } 146 catch (Throwable err) { 147 String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler; 148 MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err); 149 triggerAfterMessageHandled(message, ex2); 150 throw ex2; 151 } 152 } 153 154 private Message<?> applyBeforeHandle(Message<?> message) { 155 Message<?> messageToUse = message; 156 for (ExecutorChannelInterceptor interceptor : executorInterceptors) { 157 messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler); 158 if (messageToUse == null) { 159 String name = interceptor.getClass().getSimpleName(); 160 if (logger.isDebugEnabled()) { 161 logger.debug(name + " returned null from beforeHandle, i.e. precluding the send."); 162 } 163 triggerAfterMessageHandled(message, null); 164 return null; 165 } 166 this.interceptorIndex++; 167 } 168 return messageToUse; 169 } 170 171 private void triggerAfterMessageHandled(Message<?> message, Exception ex) { 172 for (int i = this.interceptorIndex; i >= 0; i--) { 173 ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); 174 try { 175 interceptor.afterMessageHandled(message, ExecutorSubscribableChannel.this, this.messageHandler, ex); 176 } 177 catch (Throwable ex2) { 178 logger.error("Exception from afterMessageHandled in " + interceptor, ex2); 179 } 180 } 181 } 182 } 183 184}