001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.context.request.async;
018
019import java.util.ArrayList;
020import java.util.LinkedHashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.Callable;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026
027import javax.servlet.http.HttpServletRequest;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.core.task.AsyncTaskExecutor;
033import org.springframework.core.task.SimpleAsyncTaskExecutor;
034import org.springframework.core.task.SyncTaskExecutor;
035import org.springframework.lang.Nullable;
036import org.springframework.util.Assert;
037import org.springframework.web.context.request.RequestAttributes;
038import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
039
040/**
041 * The central class for managing asynchronous request processing, mainly intended
042 * as an SPI and not typically used directly by application classes.
043 *
044 * <p>An async scenario starts with request processing as usual in a thread (T1).
045 * Concurrent request handling can be initiated by calling
046 * {@link #startCallableProcessing(Callable, Object...) startCallableProcessing} or
047 * {@link #startDeferredResultProcessing(DeferredResult, Object...) startDeferredResultProcessing},
048 * both of which produce a result in a separate thread (T2). The result is saved
049 * and the request dispatched to the container, to resume processing with the saved
050 * result in a third thread (T3). Within the dispatched thread (T3), the saved
051 * result can be accessed via {@link #getConcurrentResult()} or its presence
052 * detected via {@link #hasConcurrentResult()}.
053 *
054 * @author Rossen Stoyanchev
055 * @author Juergen Hoeller
056 * @since 3.2
057 * @see org.springframework.web.context.request.AsyncWebRequestInterceptor
058 * @see org.springframework.web.servlet.AsyncHandlerInterceptor
059 * @see org.springframework.web.filter.OncePerRequestFilter#shouldNotFilterAsyncDispatch
060 * @see org.springframework.web.filter.OncePerRequestFilter#isAsyncDispatch
061 */
062public final class WebAsyncManager {
063
064        private static final Object RESULT_NONE = new Object();
065
066        private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR =
067                        new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());
068
069        private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
070
071        private static final CallableProcessingInterceptor timeoutCallableInterceptor =
072                        new TimeoutCallableProcessingInterceptor();
073
074        private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
075                        new TimeoutDeferredResultProcessingInterceptor();
076
077        private static Boolean taskExecutorWarning = true;
078
079
080        private AsyncWebRequest asyncWebRequest;
081
082        private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR;
083
084        private volatile Object concurrentResult = RESULT_NONE;
085
086        private volatile Object[] concurrentResultContext;
087
088        /*
089         * Whether the concurrentResult is an error. If such errors remain unhandled, some
090         * Servlet containers will call AsyncListener#onError at the end, after the ASYNC
091         * and/or the ERROR dispatch (Boot's case), and we need to ignore those.
092         */
093        private volatile boolean errorHandlingInProgress;
094
095        private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>();
096
097        private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<>();
098
099
100        /**
101         * Package-private constructor.
102         * @see WebAsyncUtils#getAsyncManager(javax.servlet.ServletRequest)
103         * @see WebAsyncUtils#getAsyncManager(org.springframework.web.context.request.WebRequest)
104         */
105        WebAsyncManager() {
106        }
107
108
109        /**
110         * Configure the {@link AsyncWebRequest} to use. This property may be set
111         * more than once during a single request to accurately reflect the current
112         * state of the request (e.g. following a forward, request/response
113         * wrapping, etc). However, it should not be set while concurrent handling
114         * is in progress, i.e. while {@link #isConcurrentHandlingStarted()} is
115         * {@code true}.
116         * @param asyncWebRequest the web request to use
117         */
118        public void setAsyncWebRequest(AsyncWebRequest asyncWebRequest) {
119                Assert.notNull(asyncWebRequest, "AsyncWebRequest must not be null");
120                this.asyncWebRequest = asyncWebRequest;
121                this.asyncWebRequest.addCompletionHandler(() -> asyncWebRequest.removeAttribute(
122                                WebAsyncUtils.WEB_ASYNC_MANAGER_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST));
123        }
124
125        /**
126         * Configure an AsyncTaskExecutor for use with concurrent processing via
127         * {@link #startCallableProcessing(Callable, Object...)}.
128         * <p>By default a {@link SimpleAsyncTaskExecutor} instance is used.
129         */
130        public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
131                this.taskExecutor = taskExecutor;
132        }
133
134        /**
135         * Whether the selected handler for the current request chose to handle the
136         * request asynchronously. A return value of "true" indicates concurrent
137         * handling is under way and the response will remain open. A return value
138         * of "false" means concurrent handling was either not started or possibly
139         * that it has completed and the request was dispatched for further
140         * processing of the concurrent result.
141         */
142        public boolean isConcurrentHandlingStarted() {
143                return (this.asyncWebRequest != null && this.asyncWebRequest.isAsyncStarted());
144        }
145
146        /**
147         * Whether a result value exists as a result of concurrent handling.
148         */
149        public boolean hasConcurrentResult() {
150                return (this.concurrentResult != RESULT_NONE);
151        }
152
153        /**
154         * Provides access to the result from concurrent handling.
155         * @return an Object, possibly an {@code Exception} or {@code Throwable} if
156         * concurrent handling raised one.
157         * @see #clearConcurrentResult()
158         */
159        public Object getConcurrentResult() {
160                return this.concurrentResult;
161        }
162
163        /**
164         * Provides access to additional processing context saved at the start of
165         * concurrent handling.
166         * @see #clearConcurrentResult()
167         */
168        public Object[] getConcurrentResultContext() {
169                return this.concurrentResultContext;
170        }
171
172        /**
173         * Get the {@link CallableProcessingInterceptor} registered under the given key.
174         * @param key the key
175         * @return the interceptor registered under that key, or {@code null} if none
176         */
177        @Nullable
178        public CallableProcessingInterceptor getCallableInterceptor(Object key) {
179                return this.callableInterceptors.get(key);
180        }
181
182        /**
183         * Get the {@link DeferredResultProcessingInterceptor} registered under the given key.
184         * @param key the key
185         * @return the interceptor registered under that key, or {@code null} if none
186         */
187        @Nullable
188        public DeferredResultProcessingInterceptor getDeferredResultInterceptor(Object key) {
189                return this.deferredResultInterceptors.get(key);
190        }
191
192        /**
193         * Register a {@link CallableProcessingInterceptor} under the given key.
194         * @param key the key
195         * @param interceptor the interceptor to register
196         */
197        public void registerCallableInterceptor(Object key, CallableProcessingInterceptor interceptor) {
198                Assert.notNull(key, "Key is required");
199                Assert.notNull(interceptor, "CallableProcessingInterceptor  is required");
200                this.callableInterceptors.put(key, interceptor);
201        }
202
203        /**
204         * Register a {@link CallableProcessingInterceptor} without a key.
205         * The key is derived from the class name and hashcode.
206         * @param interceptors one or more interceptors to register
207         */
208        public void registerCallableInterceptors(CallableProcessingInterceptor... interceptors) {
209                Assert.notNull(interceptors, "A CallableProcessingInterceptor is required");
210                for (CallableProcessingInterceptor interceptor : interceptors) {
211                        String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
212                        this.callableInterceptors.put(key, interceptor);
213                }
214        }
215
216        /**
217         * Register a {@link DeferredResultProcessingInterceptor} under the given key.
218         * @param key the key
219         * @param interceptor the interceptor to register
220         */
221        public void registerDeferredResultInterceptor(Object key, DeferredResultProcessingInterceptor interceptor) {
222                Assert.notNull(key, "Key is required");
223                Assert.notNull(interceptor, "DeferredResultProcessingInterceptor is required");
224                this.deferredResultInterceptors.put(key, interceptor);
225        }
226
227        /**
228         * Register one or more {@link DeferredResultProcessingInterceptor DeferredResultProcessingInterceptors} without a specified key.
229         * The default key is derived from the interceptor class name and hash code.
230         * @param interceptors one or more interceptors to register
231         */
232        public void registerDeferredResultInterceptors(DeferredResultProcessingInterceptor... interceptors) {
233                Assert.notNull(interceptors, "A DeferredResultProcessingInterceptor is required");
234                for (DeferredResultProcessingInterceptor interceptor : interceptors) {
235                        String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
236                        this.deferredResultInterceptors.put(key, interceptor);
237                }
238        }
239
240        /**
241         * Clear {@linkplain #getConcurrentResult() concurrentResult} and
242         * {@linkplain #getConcurrentResultContext() concurrentResultContext}.
243         */
244        public void clearConcurrentResult() {
245                synchronized (WebAsyncManager.this) {
246                        this.concurrentResult = RESULT_NONE;
247                        this.concurrentResultContext = null;
248                }
249        }
250
251        /**
252         * Start concurrent request processing and execute the given task with an
253         * {@link #setTaskExecutor(AsyncTaskExecutor) AsyncTaskExecutor}. The result
254         * from the task execution is saved and the request dispatched in order to
255         * resume processing of that result. If the task raises an Exception then
256         * the saved result will be the raised Exception.
257         * @param callable a unit of work to be executed asynchronously
258         * @param processingContext additional context to save that can be accessed
259         * via {@link #getConcurrentResultContext()}
260         * @throws Exception if concurrent processing failed to start
261         * @see #getConcurrentResult()
262         * @see #getConcurrentResultContext()
263         */
264        @SuppressWarnings({"rawtypes", "unchecked"})
265        public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
266                Assert.notNull(callable, "Callable must not be null");
267                startCallableProcessing(new WebAsyncTask(callable), processingContext);
268        }
269
270        /**
271         * Use the given {@link WebAsyncTask} to configure the task executor as well as
272         * the timeout value of the {@code AsyncWebRequest} before delegating to
273         * {@link #startCallableProcessing(Callable, Object...)}.
274         * @param webAsyncTask a WebAsyncTask containing the target {@code Callable}
275         * @param processingContext additional context to save that can be accessed
276         * via {@link #getConcurrentResultContext()}
277         * @throws Exception if concurrent processing failed to start
278         */
279        public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
280                        throws Exception {
281
282                Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
283                Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
284
285                Long timeout = webAsyncTask.getTimeout();
286                if (timeout != null) {
287                        this.asyncWebRequest.setTimeout(timeout);
288                }
289
290                AsyncTaskExecutor executor = webAsyncTask.getExecutor();
291                if (executor != null) {
292                        this.taskExecutor = executor;
293                }
294                else {
295                        logExecutorWarning();
296                }
297
298                List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
299                interceptors.add(webAsyncTask.getInterceptor());
300                interceptors.addAll(this.callableInterceptors.values());
301                interceptors.add(timeoutCallableInterceptor);
302
303                final Callable<?> callable = webAsyncTask.getCallable();
304                final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
305
306                this.asyncWebRequest.addTimeoutHandler(() -> {
307                        if (logger.isDebugEnabled()) {
308                                logger.debug("Async request timeout for " + formatRequestUri());
309                        }
310                        Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
311                        if (result != CallableProcessingInterceptor.RESULT_NONE) {
312                                setConcurrentResultAndDispatch(result);
313                        }
314                });
315
316                this.asyncWebRequest.addErrorHandler(ex -> {
317                        if (!this.errorHandlingInProgress) {
318                                if (logger.isDebugEnabled()) {
319                                        logger.debug("Async request error for " + formatRequestUri() + ": " + ex);
320                                }
321                                Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
322                                result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
323                                setConcurrentResultAndDispatch(result);
324                        }
325                });
326
327                this.asyncWebRequest.addCompletionHandler(() ->
328                                interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));
329
330                interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
331                startAsyncProcessing(processingContext);
332                try {
333                        Future<?> future = this.taskExecutor.submit(() -> {
334                                Object result = null;
335                                try {
336                                        interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
337                                        result = callable.call();
338                                }
339                                catch (Throwable ex) {
340                                        result = ex;
341                                }
342                                finally {
343                                        result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
344                                }
345                                setConcurrentResultAndDispatch(result);
346                        });
347                        interceptorChain.setTaskFuture(future);
348                }
349                catch (RejectedExecutionException ex) {
350                        Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
351                        setConcurrentResultAndDispatch(result);
352                        throw ex;
353                }
354        }
355
356        private void logExecutorWarning() {
357                if (taskExecutorWarning && logger.isWarnEnabled()) {
358                        synchronized (DEFAULT_TASK_EXECUTOR) {
359                                AsyncTaskExecutor executor = this.taskExecutor;
360                                if (taskExecutorWarning &&
361                                                (executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) {
362                                        String executorTypeName = executor.getClass().getSimpleName();
363                                        logger.warn("\n!!!\n" +
364                                                        "An Executor is required to handle java.util.concurrent.Callable return values.\n" +
365                                                        "Please, configure a TaskExecutor in the MVC config under \"async support\".\n" +
366                                                        "The " + executorTypeName + " currently in use is not suitable under load.\n" +
367                                                        "-------------------------------\n" +
368                                                        "Request URI: '" + formatRequestUri() + "'\n" +
369                                                        "!!!");
370                                        taskExecutorWarning = false;
371                                }
372                        }
373                }
374        }
375
376        private String formatRequestUri() {
377                HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
378                return request != null ? request.getRequestURI() : "servlet container";
379        }
380
381        private void setConcurrentResultAndDispatch(Object result) {
382                synchronized (WebAsyncManager.this) {
383                        if (this.concurrentResult != RESULT_NONE) {
384                                return;
385                        }
386                        this.concurrentResult = result;
387                        this.errorHandlingInProgress = (result instanceof Throwable);
388                }
389
390                if (this.asyncWebRequest.isAsyncComplete()) {
391                        if (logger.isDebugEnabled()) {
392                                logger.debug("Async result set but request already complete: " + formatRequestUri());
393                        }
394                        return;
395                }
396
397                if (logger.isDebugEnabled()) {
398                        boolean isError = result instanceof Throwable;
399                        logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
400                }
401                this.asyncWebRequest.dispatch();
402        }
403
404        /**
405         * Start concurrent request processing and initialize the given
406         * {@link DeferredResult} with a {@link DeferredResultHandler} that saves
407         * the result and dispatches the request to resume processing of that
408         * result. The {@code AsyncWebRequest} is also updated with a completion
409         * handler that expires the {@code DeferredResult} and a timeout handler
410         * assuming the {@code DeferredResult} has a default timeout result.
411         * @param deferredResult the DeferredResult instance to initialize
412         * @param processingContext additional context to save that can be accessed
413         * via {@link #getConcurrentResultContext()}
414         * @throws Exception if concurrent processing failed to start
415         * @see #getConcurrentResult()
416         * @see #getConcurrentResultContext()
417         */
418        public void startDeferredResultProcessing(
419                        final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
420
421                Assert.notNull(deferredResult, "DeferredResult must not be null");
422                Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
423
424                Long timeout = deferredResult.getTimeoutValue();
425                if (timeout != null) {
426                        this.asyncWebRequest.setTimeout(timeout);
427                }
428
429                List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
430                interceptors.add(deferredResult.getInterceptor());
431                interceptors.addAll(this.deferredResultInterceptors.values());
432                interceptors.add(timeoutDeferredResultInterceptor);
433
434                final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
435
436                this.asyncWebRequest.addTimeoutHandler(() -> {
437                        try {
438                                interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
439                        }
440                        catch (Throwable ex) {
441                                setConcurrentResultAndDispatch(ex);
442                        }
443                });
444
445                this.asyncWebRequest.addErrorHandler(ex -> {
446                        if (!this.errorHandlingInProgress) {
447                                try {
448                                        if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
449                                                return;
450                                        }
451                                        deferredResult.setErrorResult(ex);
452                                }
453                                catch (Throwable interceptorEx) {
454                                        setConcurrentResultAndDispatch(interceptorEx);
455                                }
456                        }
457                });
458
459                this.asyncWebRequest.addCompletionHandler(()
460                                -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
461
462                interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
463                startAsyncProcessing(processingContext);
464
465                try {
466                        interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
467                        deferredResult.setResultHandler(result -> {
468                                result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
469                                setConcurrentResultAndDispatch(result);
470                        });
471                }
472                catch (Throwable ex) {
473                        setConcurrentResultAndDispatch(ex);
474                }
475        }
476
477        private void startAsyncProcessing(Object[] processingContext) {
478                synchronized (WebAsyncManager.this) {
479                        this.concurrentResult = RESULT_NONE;
480                        this.concurrentResultContext = processingContext;
481                        this.errorHandlingInProgress = false;
482                }
483                this.asyncWebRequest.startAsync();
484
485                if (logger.isDebugEnabled()) {
486                        logger.debug("Started async request");
487                }
488        }
489
490}