001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.aop.interceptor; 018 019import java.lang.reflect.Method; 020import java.util.Map; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CompletionException; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.function.Supplier; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.beans.factory.BeanFactory; 033import org.springframework.beans.factory.BeanFactoryAware; 034import org.springframework.beans.factory.NoSuchBeanDefinitionException; 035import org.springframework.beans.factory.NoUniqueBeanDefinitionException; 036import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; 037import org.springframework.core.task.AsyncListenableTaskExecutor; 038import org.springframework.core.task.AsyncTaskExecutor; 039import org.springframework.core.task.TaskExecutor; 040import org.springframework.core.task.support.TaskExecutorAdapter; 041import org.springframework.lang.Nullable; 042import org.springframework.util.ReflectionUtils; 043import org.springframework.util.StringUtils; 044import org.springframework.util.concurrent.ListenableFuture; 045import org.springframework.util.function.SingletonSupplier; 046 047/** 048 * Base class for asynchronous method execution aspects, such as 049 * {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor} 050 * or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}. 051 * 052 * <p>Provides support for <i>executor qualification</i> on a method-by-method basis. 053 * {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code 054 * Executor}, but each individual method may further qualify a specific {@code Executor} 055 * bean to be used when executing it, e.g. through an annotation attribute. 056 * 057 * @author Chris Beams 058 * @author Juergen Hoeller 059 * @author Stephane Nicoll 060 * @since 3.1.2 061 */ 062public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { 063 064 /** 065 * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor". 066 * <p>Note that the initial lookup happens by type; this is just the fallback 067 * in case of multiple executor beans found in the context. 068 * @since 4.2.6 069 */ 070 public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor"; 071 072 073 protected final Log logger = LogFactory.getLog(getClass()); 074 075 private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16); 076 077 private SingletonSupplier<Executor> defaultExecutor; 078 079 private SingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler; 080 081 @Nullable 082 private BeanFactory beanFactory; 083 084 085 /** 086 * Create a new instance with a default {@link AsyncUncaughtExceptionHandler}. 087 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 088 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 089 * executor has been requested via a qualifier on the async method, in which case the 090 * executor will be looked up at invocation time against the enclosing bean factory 091 */ 092 public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) { 093 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); 094 this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); 095 } 096 097 /** 098 * Create a new {@link AsyncExecutionAspectSupport} with the given exception handler. 099 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 100 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 101 * executor has been requested via a qualifier on the async method, in which case the 102 * executor will be looked up at invocation time against the enclosing bean factory 103 * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use 104 */ 105 public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { 106 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); 107 this.exceptionHandler = SingletonSupplier.of(exceptionHandler); 108 } 109 110 111 /** 112 * Configure this aspect with the given executor and exception handler suppliers, 113 * applying the corresponding default if a supplier is not resolvable. 114 * @since 5.1 115 */ 116 public void configure(@Nullable Supplier<Executor> defaultExecutor, 117 @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { 118 119 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); 120 this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); 121 } 122 123 /** 124 * Supply the executor to be used when executing async methods. 125 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 126 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 127 * executor has been requested via a qualifier on the async method, in which case the 128 * executor will be looked up at invocation time against the enclosing bean factory 129 * @see #getExecutorQualifier(Method) 130 * @see #setBeanFactory(BeanFactory) 131 * @see #getDefaultExecutor(BeanFactory) 132 */ 133 public void setExecutor(Executor defaultExecutor) { 134 this.defaultExecutor = SingletonSupplier.of(defaultExecutor); 135 } 136 137 /** 138 * Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions 139 * thrown by invoking asynchronous methods with a {@code void} return type. 140 */ 141 public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) { 142 this.exceptionHandler = SingletonSupplier.of(exceptionHandler); 143 } 144 145 /** 146 * Set the {@link BeanFactory} to be used when looking up executors by qualifier 147 * or when relying on the default executor lookup algorithm. 148 * @see #findQualifiedExecutor(BeanFactory, String) 149 * @see #getDefaultExecutor(BeanFactory) 150 */ 151 @Override 152 public void setBeanFactory(BeanFactory beanFactory) { 153 this.beanFactory = beanFactory; 154 } 155 156 157 /** 158 * Determine the specific executor to use when executing the given method. 159 * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. 160 * @return the executor to use (or {@code null}, but just if no default executor is available) 161 */ 162 @Nullable 163 protected AsyncTaskExecutor determineAsyncExecutor(Method method) { 164 AsyncTaskExecutor executor = this.executors.get(method); 165 if (executor == null) { 166 Executor targetExecutor; 167 String qualifier = getExecutorQualifier(method); 168 if (StringUtils.hasLength(qualifier)) { 169 targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); 170 } 171 else { 172 targetExecutor = this.defaultExecutor.get(); 173 } 174 if (targetExecutor == null) { 175 return null; 176 } 177 executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? 178 (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); 179 this.executors.put(method, executor); 180 } 181 return executor; 182 } 183 184 /** 185 * Return the qualifier or bean name of the executor to be used when executing the 186 * given async method, typically specified in the form of an annotation attribute. 187 * Returning an empty string or {@code null} indicates that no specific executor has 188 * been specified and that the {@linkplain #setExecutor(Executor) default executor} 189 * should be used. 190 * @param method the method to inspect for executor qualifier metadata 191 * @return the qualifier if specified, otherwise empty String or {@code null} 192 * @see #determineAsyncExecutor(Method) 193 * @see #findQualifiedExecutor(BeanFactory, String) 194 */ 195 @Nullable 196 protected abstract String getExecutorQualifier(Method method); 197 198 /** 199 * Retrieve a target executor for the given qualifier. 200 * @param qualifier the qualifier to resolve 201 * @return the target executor, or {@code null} if none available 202 * @since 4.2.6 203 * @see #getExecutorQualifier(Method) 204 */ 205 @Nullable 206 protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) { 207 if (beanFactory == null) { 208 throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() + 209 " to access qualified executor '" + qualifier + "'"); 210 } 211 return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); 212 } 213 214 /** 215 * Retrieve or build a default executor for this advice instance. 216 * An executor returned from here will be cached for further use. 217 * <p>The default implementation searches for a unique {@link TaskExecutor} bean 218 * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. 219 * If neither of the two is resolvable, this implementation will return {@code null}. 220 * @param beanFactory the BeanFactory to use for a default executor lookup 221 * @return the default executor, or {@code null} if none available 222 * @since 4.2.6 223 * @see #findQualifiedExecutor(BeanFactory, String) 224 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME 225 */ 226 @Nullable 227 protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { 228 if (beanFactory != null) { 229 try { 230 // Search for TaskExecutor bean... not plain Executor since that would 231 // match with ScheduledExecutorService as well, which is unusable for 232 // our purposes here. TaskExecutor is more clearly designed for it. 233 return beanFactory.getBean(TaskExecutor.class); 234 } 235 catch (NoUniqueBeanDefinitionException ex) { 236 logger.debug("Could not find unique TaskExecutor bean", ex); 237 try { 238 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); 239 } 240 catch (NoSuchBeanDefinitionException ex2) { 241 if (logger.isInfoEnabled()) { 242 logger.info("More than one TaskExecutor bean found within the context, and none is named " + 243 "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + 244 "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); 245 } 246 } 247 } 248 catch (NoSuchBeanDefinitionException ex) { 249 logger.debug("Could not find default TaskExecutor bean", ex); 250 try { 251 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); 252 } 253 catch (NoSuchBeanDefinitionException ex2) { 254 logger.info("No task executor bean found for async processing: " + 255 "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); 256 } 257 // Giving up -> either using local default executor or none at all... 258 } 259 } 260 return null; 261 } 262 263 264 /** 265 * Delegate for actually executing the given task with the chosen executor. 266 * @param task the task to execute 267 * @param executor the chosen executor 268 * @param returnType the declared return type (potentially a {@link Future} variant) 269 * @return the execution result (potentially a corresponding {@link Future} handle) 270 */ 271 @Nullable 272 protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { 273 if (CompletableFuture.class.isAssignableFrom(returnType)) { 274 return CompletableFuture.supplyAsync(() -> { 275 try { 276 return task.call(); 277 } 278 catch (Throwable ex) { 279 throw new CompletionException(ex); 280 } 281 }, executor); 282 } 283 else if (ListenableFuture.class.isAssignableFrom(returnType)) { 284 return ((AsyncListenableTaskExecutor) executor).submitListenable(task); 285 } 286 else if (Future.class.isAssignableFrom(returnType)) { 287 return executor.submit(task); 288 } 289 else { 290 executor.submit(task); 291 return null; 292 } 293 } 294 295 /** 296 * Handles a fatal error thrown while asynchronously invoking the specified 297 * {@link Method}. 298 * <p>If the return type of the method is a {@link Future} object, the original 299 * exception can be propagated by just throwing it at the higher level. However, 300 * for all other cases, the exception will not be transmitted back to the client. 301 * In that later case, the current {@link AsyncUncaughtExceptionHandler} will be 302 * used to manage such exception. 303 * @param ex the exception to handle 304 * @param method the method that was invoked 305 * @param params the parameters used to invoke the method 306 */ 307 protected void handleError(Throwable ex, Method method, Object... params) throws Exception { 308 if (Future.class.isAssignableFrom(method.getReturnType())) { 309 ReflectionUtils.rethrowException(ex); 310 } 311 else { 312 // Could not transmit the exception to the caller with default executor 313 try { 314 this.exceptionHandler.obtain().handleUncaughtException(ex, method, params); 315 } 316 catch (Throwable ex2) { 317 logger.warn("Exception handler for async method '" + method.toGenericString() + 318 "' threw unexpected exception itself", ex2); 319 } 320 } 321 } 322 323}