001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.aop.interceptor;
018
019import java.lang.reflect.Method;
020import java.util.Map;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.function.Supplier;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.beans.factory.BeanFactory;
033import org.springframework.beans.factory.BeanFactoryAware;
034import org.springframework.beans.factory.NoSuchBeanDefinitionException;
035import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
036import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
037import org.springframework.core.task.AsyncListenableTaskExecutor;
038import org.springframework.core.task.AsyncTaskExecutor;
039import org.springframework.core.task.TaskExecutor;
040import org.springframework.core.task.support.TaskExecutorAdapter;
041import org.springframework.lang.Nullable;
042import org.springframework.util.ReflectionUtils;
043import org.springframework.util.StringUtils;
044import org.springframework.util.concurrent.ListenableFuture;
045import org.springframework.util.function.SingletonSupplier;
046
047/**
048 * Base class for asynchronous method execution aspects, such as
049 * {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}
050 * or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.
051 *
052 * <p>Provides support for <i>executor qualification</i> on a method-by-method basis.
053 * {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code
054 * Executor}, but each individual method may further qualify a specific {@code Executor}
055 * bean to be used when executing it, e.g. through an annotation attribute.
056 *
057 * @author Chris Beams
058 * @author Juergen Hoeller
059 * @author Stephane Nicoll
060 * @since 3.1.2
061 */
062public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
063
064        /**
065         * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".
066         * <p>Note that the initial lookup happens by type; this is just the fallback
067         * in case of multiple executor beans found in the context.
068         * @since 4.2.6
069         */
070        public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
071
072
073        protected final Log logger = LogFactory.getLog(getClass());
074
075        private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
076
077        private SingletonSupplier<Executor> defaultExecutor;
078
079        private SingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;
080
081        @Nullable
082        private BeanFactory beanFactory;
083
084
085        /**
086         * Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.
087         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
088         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
089         * executor has been requested via a qualifier on the async method, in which case the
090         * executor will be looked up at invocation time against the enclosing bean factory
091         */
092        public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
093                this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
094                this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
095        }
096
097        /**
098         * Create a new {@link AsyncExecutionAspectSupport} with the given exception handler.
099         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
100         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
101         * executor has been requested via a qualifier on the async method, in which case the
102         * executor will be looked up at invocation time against the enclosing bean factory
103         * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use
104         */
105        public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
106                this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
107                this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
108        }
109
110
111        /**
112         * Configure this aspect with the given executor and exception handler suppliers,
113         * applying the corresponding default if a supplier is not resolvable.
114         * @since 5.1
115         */
116        public void configure(@Nullable Supplier<Executor> defaultExecutor,
117                        @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
118
119                this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
120                this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
121        }
122
123        /**
124         * Supply the executor to be used when executing async methods.
125         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
126         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
127         * executor has been requested via a qualifier on the async method, in which case the
128         * executor will be looked up at invocation time against the enclosing bean factory
129         * @see #getExecutorQualifier(Method)
130         * @see #setBeanFactory(BeanFactory)
131         * @see #getDefaultExecutor(BeanFactory)
132         */
133        public void setExecutor(Executor defaultExecutor) {
134                this.defaultExecutor = SingletonSupplier.of(defaultExecutor);
135        }
136
137        /**
138         * Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions
139         * thrown by invoking asynchronous methods with a {@code void} return type.
140         */
141        public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
142                this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
143        }
144
145        /**
146         * Set the {@link BeanFactory} to be used when looking up executors by qualifier
147         * or when relying on the default executor lookup algorithm.
148         * @see #findQualifiedExecutor(BeanFactory, String)
149         * @see #getDefaultExecutor(BeanFactory)
150         */
151        @Override
152        public void setBeanFactory(BeanFactory beanFactory) {
153                this.beanFactory = beanFactory;
154        }
155
156
157        /**
158         * Determine the specific executor to use when executing the given method.
159         * Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
160         * @return the executor to use (or {@code null}, but just if no default executor is available)
161         */
162        @Nullable
163        protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
164                AsyncTaskExecutor executor = this.executors.get(method);
165                if (executor == null) {
166                        Executor targetExecutor;
167                        String qualifier = getExecutorQualifier(method);
168                        if (StringUtils.hasLength(qualifier)) {
169                                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
170                        }
171                        else {
172                                targetExecutor = this.defaultExecutor.get();
173                        }
174                        if (targetExecutor == null) {
175                                return null;
176                        }
177                        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
178                                        (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
179                        this.executors.put(method, executor);
180                }
181                return executor;
182        }
183
184        /**
185         * Return the qualifier or bean name of the executor to be used when executing the
186         * given async method, typically specified in the form of an annotation attribute.
187         * Returning an empty string or {@code null} indicates that no specific executor has
188         * been specified and that the {@linkplain #setExecutor(Executor) default executor}
189         * should be used.
190         * @param method the method to inspect for executor qualifier metadata
191         * @return the qualifier if specified, otherwise empty String or {@code null}
192         * @see #determineAsyncExecutor(Method)
193         * @see #findQualifiedExecutor(BeanFactory, String)
194         */
195        @Nullable
196        protected abstract String getExecutorQualifier(Method method);
197
198        /**
199         * Retrieve a target executor for the given qualifier.
200         * @param qualifier the qualifier to resolve
201         * @return the target executor, or {@code null} if none available
202         * @since 4.2.6
203         * @see #getExecutorQualifier(Method)
204         */
205        @Nullable
206        protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
207                if (beanFactory == null) {
208                        throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
209                                        " to access qualified executor '" + qualifier + "'");
210                }
211                return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
212        }
213
214        /**
215         * Retrieve or build a default executor for this advice instance.
216         * An executor returned from here will be cached for further use.
217         * <p>The default implementation searches for a unique {@link TaskExecutor} bean
218         * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
219         * If neither of the two is resolvable, this implementation will return {@code null}.
220         * @param beanFactory the BeanFactory to use for a default executor lookup
221         * @return the default executor, or {@code null} if none available
222         * @since 4.2.6
223         * @see #findQualifiedExecutor(BeanFactory, String)
224         * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
225         */
226        @Nullable
227        protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
228                if (beanFactory != null) {
229                        try {
230                                // Search for TaskExecutor bean... not plain Executor since that would
231                                // match with ScheduledExecutorService as well, which is unusable for
232                                // our purposes here. TaskExecutor is more clearly designed for it.
233                                return beanFactory.getBean(TaskExecutor.class);
234                        }
235                        catch (NoUniqueBeanDefinitionException ex) {
236                                logger.debug("Could not find unique TaskExecutor bean", ex);
237                                try {
238                                        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
239                                }
240                                catch (NoSuchBeanDefinitionException ex2) {
241                                        if (logger.isInfoEnabled()) {
242                                                logger.info("More than one TaskExecutor bean found within the context, and none is named " +
243                                                                "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
244                                                                "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
245                                        }
246                                }
247                        }
248                        catch (NoSuchBeanDefinitionException ex) {
249                                logger.debug("Could not find default TaskExecutor bean", ex);
250                                try {
251                                        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
252                                }
253                                catch (NoSuchBeanDefinitionException ex2) {
254                                        logger.info("No task executor bean found for async processing: " +
255                                                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
256                                }
257                                // Giving up -> either using local default executor or none at all...
258                        }
259                }
260                return null;
261        }
262
263
264        /**
265         * Delegate for actually executing the given task with the chosen executor.
266         * @param task the task to execute
267         * @param executor the chosen executor
268         * @param returnType the declared return type (potentially a {@link Future} variant)
269         * @return the execution result (potentially a corresponding {@link Future} handle)
270         */
271        @Nullable
272        protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
273                if (CompletableFuture.class.isAssignableFrom(returnType)) {
274                        return CompletableFuture.supplyAsync(() -> {
275                                try {
276                                        return task.call();
277                                }
278                                catch (Throwable ex) {
279                                        throw new CompletionException(ex);
280                                }
281                        }, executor);
282                }
283                else if (ListenableFuture.class.isAssignableFrom(returnType)) {
284                        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
285                }
286                else if (Future.class.isAssignableFrom(returnType)) {
287                        return executor.submit(task);
288                }
289                else {
290                        executor.submit(task);
291                        return null;
292                }
293        }
294
295        /**
296         * Handles a fatal error thrown while asynchronously invoking the specified
297         * {@link Method}.
298         * <p>If the return type of the method is a {@link Future} object, the original
299         * exception can be propagated by just throwing it at the higher level. However,
300         * for all other cases, the exception will not be transmitted back to the client.
301         * In that later case, the current {@link AsyncUncaughtExceptionHandler} will be
302         * used to manage such exception.
303         * @param ex the exception to handle
304         * @param method the method that was invoked
305         * @param params the parameters used to invoke the method
306         */
307        protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
308                if (Future.class.isAssignableFrom(method.getReturnType())) {
309                        ReflectionUtils.rethrowException(ex);
310                }
311                else {
312                        // Could not transmit the exception to the caller with default executor
313                        try {
314                                this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
315                        }
316                        catch (Throwable ex2) {
317                                logger.warn("Exception handler for async method '" + method.toGenericString() +
318                                                "' threw unexpected exception itself", ex2);
319                        }
320                }
321        }
322
323}