001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.aop.interceptor; 018 019import java.lang.reflect.Method; 020import java.util.Map; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CompletionException; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.function.Supplier; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.beans.factory.BeanFactory; 033import org.springframework.beans.factory.BeanFactoryAware; 034import org.springframework.beans.factory.NoSuchBeanDefinitionException; 035import org.springframework.beans.factory.NoUniqueBeanDefinitionException; 036import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; 037import org.springframework.core.task.AsyncListenableTaskExecutor; 038import org.springframework.core.task.AsyncTaskExecutor; 039import org.springframework.core.task.TaskExecutor; 040import org.springframework.core.task.support.TaskExecutorAdapter; 041import org.springframework.lang.UsesJava8; 042import org.springframework.util.ClassUtils; 043import org.springframework.util.ReflectionUtils; 044import org.springframework.util.StringUtils; 045import org.springframework.util.concurrent.ListenableFuture; 046 047/** 048 * Base class for asynchronous method execution aspects, such as 049 * {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor} 050 * or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}. 051 * 052 * <p>Provides support for <i>executor qualification</i> on a method-by-method basis. 053 * {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code 054 * Executor}, but each individual method may further qualify a specific {@code Executor} 055 * bean to be used when executing it, e.g. through an annotation attribute. 056 * 057 * @author Chris Beams 058 * @author Juergen Hoeller 059 * @author Stephane Nicoll 060 * @since 3.1.2 061 */ 062public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { 063 064 /** 065 * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor". 066 * <p>Note that the initial lookup happens by type; this is just the fallback 067 * in case of multiple executor beans found in the context. 068 * @since 4.2.6 069 */ 070 public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor"; 071 072 073 // Java 8's CompletableFuture type present? 074 private static final boolean completableFuturePresent = ClassUtils.isPresent( 075 "java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader()); 076 077 078 protected final Log logger = LogFactory.getLog(getClass()); 079 080 private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16); 081 082 private volatile Executor defaultExecutor; 083 084 private AsyncUncaughtExceptionHandler exceptionHandler; 085 086 private BeanFactory beanFactory; 087 088 089 /** 090 * Create a new instance with a default {@link AsyncUncaughtExceptionHandler}. 091 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 092 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 093 * executor has been requested via a qualifier on the async method, in which case the 094 * executor will be looked up at invocation time against the enclosing bean factory 095 */ 096 public AsyncExecutionAspectSupport(Executor defaultExecutor) { 097 this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler()); 098 } 099 100 /** 101 * Create a new {@link AsyncExecutionAspectSupport} with the given exception handler. 102 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 103 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 104 * executor has been requested via a qualifier on the async method, in which case the 105 * executor will be looked up at invocation time against the enclosing bean factory 106 * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use 107 */ 108 public AsyncExecutionAspectSupport(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { 109 this.defaultExecutor = defaultExecutor; 110 this.exceptionHandler = exceptionHandler; 111 } 112 113 114 /** 115 * Supply the executor to be used when executing async methods. 116 * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor} 117 * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific 118 * executor has been requested via a qualifier on the async method, in which case the 119 * executor will be looked up at invocation time against the enclosing bean factory 120 * @see #getExecutorQualifier(Method) 121 * @see #setBeanFactory(BeanFactory) 122 * @see #getDefaultExecutor(BeanFactory) 123 */ 124 public void setExecutor(Executor defaultExecutor) { 125 this.defaultExecutor = defaultExecutor; 126 } 127 128 /** 129 * Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions 130 * thrown by invoking asynchronous methods with a {@code void} return type. 131 */ 132 public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) { 133 this.exceptionHandler = exceptionHandler; 134 } 135 136 /** 137 * Set the {@link BeanFactory} to be used when looking up executors by qualifier 138 * or when relying on the default executor lookup algorithm. 139 * @see #findQualifiedExecutor(BeanFactory, String) 140 * @see #getDefaultExecutor(BeanFactory) 141 */ 142 @Override 143 public void setBeanFactory(BeanFactory beanFactory) { 144 this.beanFactory = beanFactory; 145 } 146 147 148 /** 149 * Determine the specific executor to use when executing the given method. 150 * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. 151 * @return the executor to use (or {@code null}, but just if no default executor is available) 152 */ 153 protected AsyncTaskExecutor determineAsyncExecutor(Method method) { 154 AsyncTaskExecutor executor = this.executors.get(method); 155 if (executor == null) { 156 Executor targetExecutor; 157 String qualifier = getExecutorQualifier(method); 158 if (StringUtils.hasLength(qualifier)) { 159 targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); 160 } 161 else { 162 targetExecutor = this.defaultExecutor; 163 if (targetExecutor == null) { 164 synchronized (this.executors) { 165 if (this.defaultExecutor == null) { 166 this.defaultExecutor = getDefaultExecutor(this.beanFactory); 167 } 168 targetExecutor = this.defaultExecutor; 169 } 170 } 171 } 172 if (targetExecutor == null) { 173 return null; 174 } 175 executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? 176 (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); 177 this.executors.put(method, executor); 178 } 179 return executor; 180 } 181 182 /** 183 * Return the qualifier or bean name of the executor to be used when executing the 184 * given async method, typically specified in the form of an annotation attribute. 185 * Returning an empty string or {@code null} indicates that no specific executor has 186 * been specified and that the {@linkplain #setExecutor(Executor) default executor} 187 * should be used. 188 * @param method the method to inspect for executor qualifier metadata 189 * @return the qualifier if specified, otherwise empty String or {@code null} 190 * @see #determineAsyncExecutor(Method) 191 * @see #findQualifiedExecutor(BeanFactory, String) 192 */ 193 protected abstract String getExecutorQualifier(Method method); 194 195 /** 196 * Retrieve a target executor for the given qualifier. 197 * @param qualifier the qualifier to resolve 198 * @return the target executor, or {@code null} if none available 199 * @since 4.2.6 200 * @see #getExecutorQualifier(Method) 201 */ 202 protected Executor findQualifiedExecutor(BeanFactory beanFactory, String qualifier) { 203 if (beanFactory == null) { 204 throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() + 205 " to access qualified executor '" + qualifier + "'"); 206 } 207 return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier); 208 } 209 210 /** 211 * Retrieve or build a default executor for this advice instance. 212 * An executor returned from here will be cached for further use. 213 * <p>The default implementation searches for a unique {@link TaskExecutor} bean 214 * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. 215 * If neither of the two is resolvable, this implementation will return {@code null}. 216 * @param beanFactory the BeanFactory to use for a default executor lookup 217 * @return the default executor, or {@code null} if none available 218 * @since 4.2.6 219 * @see #findQualifiedExecutor(BeanFactory, String) 220 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME 221 */ 222 protected Executor getDefaultExecutor(BeanFactory beanFactory) { 223 if (beanFactory != null) { 224 try { 225 // Search for TaskExecutor bean... not plain Executor since that would 226 // match with ScheduledExecutorService as well, which is unusable for 227 // our purposes here. TaskExecutor is more clearly designed for it. 228 return beanFactory.getBean(TaskExecutor.class); 229 } 230 catch (NoUniqueBeanDefinitionException ex) { 231 logger.debug("Could not find unique TaskExecutor bean", ex); 232 try { 233 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); 234 } 235 catch (NoSuchBeanDefinitionException ex2) { 236 if (logger.isInfoEnabled()) { 237 logger.info("More than one TaskExecutor bean found within the context, and none is named " + 238 "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + 239 "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); 240 } 241 } 242 } 243 catch (NoSuchBeanDefinitionException ex) { 244 logger.debug("Could not find default TaskExecutor bean", ex); 245 try { 246 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); 247 } 248 catch (NoSuchBeanDefinitionException ex2) { 249 logger.info("No task executor bean found for async processing: " + 250 "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); 251 } 252 // Giving up -> either using local default executor or none at all... 253 } 254 } 255 return null; 256 } 257 258 259 /** 260 * Delegate for actually executing the given task with the chosen executor. 261 * @param task the task to execute 262 * @param executor the chosen executor 263 * @param returnType the declared return type (potentially a {@link Future} variant) 264 * @return the execution result (potentially a corresponding {@link Future} handle) 265 */ 266 protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { 267 if (completableFuturePresent) { 268 Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor); 269 if (result != null) { 270 return result; 271 } 272 } 273 if (ListenableFuture.class.isAssignableFrom(returnType)) { 274 return ((AsyncListenableTaskExecutor) executor).submitListenable(task); 275 } 276 else if (Future.class.isAssignableFrom(returnType)) { 277 return executor.submit(task); 278 } 279 else { 280 executor.submit(task); 281 return null; 282 } 283 } 284 285 /** 286 * Handles a fatal error thrown while asynchronously invoking the specified 287 * {@link Method}. 288 * <p>If the return type of the method is a {@link Future} object, the original 289 * exception can be propagated by just throwing it at the higher level. However, 290 * for all other cases, the exception will not be transmitted back to the client. 291 * In that later case, the current {@link AsyncUncaughtExceptionHandler} will be 292 * used to manage such exception. 293 * @param ex the exception to handle 294 * @param method the method that was invoked 295 * @param params the parameters used to invoke the method 296 */ 297 protected void handleError(Throwable ex, Method method, Object... params) throws Exception { 298 if (Future.class.isAssignableFrom(method.getReturnType())) { 299 ReflectionUtils.rethrowException(ex); 300 } 301 else { 302 // Could not transmit the exception to the caller with default executor 303 try { 304 this.exceptionHandler.handleUncaughtException(ex, method, params); 305 } 306 catch (Throwable ex2) { 307 logger.error("Exception handler for async method '" + method.toGenericString() + 308 "' threw unexpected exception itself", ex2); 309 } 310 } 311 } 312 313 314 /** 315 * Inner class to avoid a hard dependency on Java 8. 316 */ 317 @UsesJava8 318 private static class CompletableFutureDelegate { 319 320 public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) { 321 if (!CompletableFuture.class.isAssignableFrom(returnType)) { 322 return null; 323 } 324 return CompletableFuture.supplyAsync(new Supplier<T>() { 325 @Override 326 public T get() { 327 try { 328 return task.call(); 329 } 330 catch (Throwable ex) { 331 throw new CompletionException(ex); 332 } 333 } 334 }, executor); 335 } 336 } 337 338}