001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.aop.interceptor;
018
019import java.lang.reflect.Method;
020import java.util.Map;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.function.Supplier;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.beans.factory.BeanFactory;
033import org.springframework.beans.factory.BeanFactoryAware;
034import org.springframework.beans.factory.NoSuchBeanDefinitionException;
035import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
036import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
037import org.springframework.core.task.AsyncListenableTaskExecutor;
038import org.springframework.core.task.AsyncTaskExecutor;
039import org.springframework.core.task.TaskExecutor;
040import org.springframework.core.task.support.TaskExecutorAdapter;
041import org.springframework.lang.UsesJava8;
042import org.springframework.util.ClassUtils;
043import org.springframework.util.ReflectionUtils;
044import org.springframework.util.StringUtils;
045import org.springframework.util.concurrent.ListenableFuture;
046
047/**
048 * Base class for asynchronous method execution aspects, such as
049 * {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}
050 * or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.
051 *
052 * <p>Provides support for <i>executor qualification</i> on a method-by-method basis.
053 * {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code
054 * Executor}, but each individual method may further qualify a specific {@code Executor}
055 * bean to be used when executing it, e.g. through an annotation attribute.
056 *
057 * @author Chris Beams
058 * @author Juergen Hoeller
059 * @author Stephane Nicoll
060 * @since 3.1.2
061 */
062public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
063
064        /**
065         * The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".
066         * <p>Note that the initial lookup happens by type; this is just the fallback
067         * in case of multiple executor beans found in the context.
068         * @since 4.2.6
069         */
070        public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
071
072
073        // Java 8's CompletableFuture type present?
074        private static final boolean completableFuturePresent = ClassUtils.isPresent(
075                        "java.util.concurrent.CompletableFuture", AsyncExecutionInterceptor.class.getClassLoader());
076
077
078        protected final Log logger = LogFactory.getLog(getClass());
079
080        private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<Method, AsyncTaskExecutor>(16);
081
082        private volatile Executor defaultExecutor;
083
084        private AsyncUncaughtExceptionHandler exceptionHandler;
085
086        private BeanFactory beanFactory;
087
088
089        /**
090         * Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.
091         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
092         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
093         * executor has been requested via a qualifier on the async method, in which case the
094         * executor will be looked up at invocation time against the enclosing bean factory
095         */
096        public AsyncExecutionAspectSupport(Executor defaultExecutor) {
097                this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
098        }
099
100        /**
101         * Create a new {@link AsyncExecutionAspectSupport} with the given exception handler.
102         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
103         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
104         * executor has been requested via a qualifier on the async method, in which case the
105         * executor will be looked up at invocation time against the enclosing bean factory
106         * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use
107         */
108        public AsyncExecutionAspectSupport(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
109                this.defaultExecutor = defaultExecutor;
110                this.exceptionHandler = exceptionHandler;
111        }
112
113
114        /**
115         * Supply the executor to be used when executing async methods.
116         * @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}
117         * or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific
118         * executor has been requested via a qualifier on the async method, in which case the
119         * executor will be looked up at invocation time against the enclosing bean factory
120         * @see #getExecutorQualifier(Method)
121         * @see #setBeanFactory(BeanFactory)
122         * @see #getDefaultExecutor(BeanFactory)
123         */
124        public void setExecutor(Executor defaultExecutor) {
125                this.defaultExecutor = defaultExecutor;
126        }
127
128        /**
129         * Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions
130         * thrown by invoking asynchronous methods with a {@code void} return type.
131         */
132        public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
133                this.exceptionHandler = exceptionHandler;
134        }
135
136        /**
137         * Set the {@link BeanFactory} to be used when looking up executors by qualifier
138         * or when relying on the default executor lookup algorithm.
139         * @see #findQualifiedExecutor(BeanFactory, String)
140         * @see #getDefaultExecutor(BeanFactory)
141         */
142        @Override
143        public void setBeanFactory(BeanFactory beanFactory) {
144                this.beanFactory = beanFactory;
145        }
146
147
148        /**
149         * Determine the specific executor to use when executing the given method.
150         * Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
151         * @return the executor to use (or {@code null}, but just if no default executor is available)
152         */
153        protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
154                AsyncTaskExecutor executor = this.executors.get(method);
155                if (executor == null) {
156                        Executor targetExecutor;
157                        String qualifier = getExecutorQualifier(method);
158                        if (StringUtils.hasLength(qualifier)) {
159                                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
160                        }
161                        else {
162                                targetExecutor = this.defaultExecutor;
163                                if (targetExecutor == null) {
164                                        synchronized (this.executors) {
165                                                if (this.defaultExecutor == null) {
166                                                        this.defaultExecutor = getDefaultExecutor(this.beanFactory);
167                                                }
168                                                targetExecutor = this.defaultExecutor;
169                                        }
170                                }
171                        }
172                        if (targetExecutor == null) {
173                                return null;
174                        }
175                        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
176                                        (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
177                        this.executors.put(method, executor);
178                }
179                return executor;
180        }
181
182        /**
183         * Return the qualifier or bean name of the executor to be used when executing the
184         * given async method, typically specified in the form of an annotation attribute.
185         * Returning an empty string or {@code null} indicates that no specific executor has
186         * been specified and that the {@linkplain #setExecutor(Executor) default executor}
187         * should be used.
188         * @param method the method to inspect for executor qualifier metadata
189         * @return the qualifier if specified, otherwise empty String or {@code null}
190         * @see #determineAsyncExecutor(Method)
191         * @see #findQualifiedExecutor(BeanFactory, String)
192         */
193        protected abstract String getExecutorQualifier(Method method);
194
195        /**
196         * Retrieve a target executor for the given qualifier.
197         * @param qualifier the qualifier to resolve
198         * @return the target executor, or {@code null} if none available
199         * @since 4.2.6
200         * @see #getExecutorQualifier(Method)
201         */
202        protected Executor findQualifiedExecutor(BeanFactory beanFactory, String qualifier) {
203                if (beanFactory == null) {
204                        throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
205                                        " to access qualified executor '" + qualifier + "'");
206                }
207                return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
208        }
209
210        /**
211         * Retrieve or build a default executor for this advice instance.
212         * An executor returned from here will be cached for further use.
213         * <p>The default implementation searches for a unique {@link TaskExecutor} bean
214         * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
215         * If neither of the two is resolvable, this implementation will return {@code null}.
216         * @param beanFactory the BeanFactory to use for a default executor lookup
217         * @return the default executor, or {@code null} if none available
218         * @since 4.2.6
219         * @see #findQualifiedExecutor(BeanFactory, String)
220         * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
221         */
222        protected Executor getDefaultExecutor(BeanFactory beanFactory) {
223                if (beanFactory != null) {
224                        try {
225                                // Search for TaskExecutor bean... not plain Executor since that would
226                                // match with ScheduledExecutorService as well, which is unusable for
227                                // our purposes here. TaskExecutor is more clearly designed for it.
228                                return beanFactory.getBean(TaskExecutor.class);
229                        }
230                        catch (NoUniqueBeanDefinitionException ex) {
231                                logger.debug("Could not find unique TaskExecutor bean", ex);
232                                try {
233                                        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
234                                }
235                                catch (NoSuchBeanDefinitionException ex2) {
236                                        if (logger.isInfoEnabled()) {
237                                                logger.info("More than one TaskExecutor bean found within the context, and none is named " +
238                                                                "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
239                                                                "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
240                                        }
241                                }
242                        }
243                        catch (NoSuchBeanDefinitionException ex) {
244                                logger.debug("Could not find default TaskExecutor bean", ex);
245                                try {
246                                        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
247                                }
248                                catch (NoSuchBeanDefinitionException ex2) {
249                                        logger.info("No task executor bean found for async processing: " +
250                                                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");
251                                }
252                                // Giving up -> either using local default executor or none at all...
253                        }
254                }
255                return null;
256        }
257
258
259        /**
260         * Delegate for actually executing the given task with the chosen executor.
261         * @param task the task to execute
262         * @param executor the chosen executor
263         * @param returnType the declared return type (potentially a {@link Future} variant)
264         * @return the execution result (potentially a corresponding {@link Future} handle)
265         */
266        protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
267                if (completableFuturePresent) {
268                        Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
269                        if (result != null) {
270                                return result;
271                        }
272                }
273                if (ListenableFuture.class.isAssignableFrom(returnType)) {
274                        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
275                }
276                else if (Future.class.isAssignableFrom(returnType)) {
277                        return executor.submit(task);
278                }
279                else {
280                        executor.submit(task);
281                        return null;
282                }
283        }
284
285        /**
286         * Handles a fatal error thrown while asynchronously invoking the specified
287         * {@link Method}.
288         * <p>If the return type of the method is a {@link Future} object, the original
289         * exception can be propagated by just throwing it at the higher level. However,
290         * for all other cases, the exception will not be transmitted back to the client.
291         * In that later case, the current {@link AsyncUncaughtExceptionHandler} will be
292         * used to manage such exception.
293         * @param ex the exception to handle
294         * @param method the method that was invoked
295         * @param params the parameters used to invoke the method
296         */
297        protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
298                if (Future.class.isAssignableFrom(method.getReturnType())) {
299                        ReflectionUtils.rethrowException(ex);
300                }
301                else {
302                        // Could not transmit the exception to the caller with default executor
303                        try {
304                                this.exceptionHandler.handleUncaughtException(ex, method, params);
305                        }
306                        catch (Throwable ex2) {
307                                logger.error("Exception handler for async method '" + method.toGenericString() +
308                                                "' threw unexpected exception itself", ex2);
309                        }
310                }
311        }
312
313
314        /**
315         * Inner class to avoid a hard dependency on Java 8.
316         */
317        @UsesJava8
318        private static class CompletableFutureDelegate {
319
320                public static <T> Future<T> processCompletableFuture(Class<?> returnType, final Callable<T> task, Executor executor) {
321                        if (!CompletableFuture.class.isAssignableFrom(returnType)) {
322                                return null;
323                        }
324                        return CompletableFuture.supplyAsync(new Supplier<T>() {
325                                @Override
326                                public T get() {
327                                        try {
328                                                return task.call();
329                                        }
330                                        catch (Throwable ex) {
331                                                throw new CompletionException(ex);
332                                        }
333                                }
334                        }, executor);
335                }
336        }
337
338}