001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.context.request.async;
018
019import java.util.ArrayList;
020import java.util.LinkedHashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.Callable;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026import javax.servlet.http.HttpServletRequest;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030
031import org.springframework.core.task.AsyncTaskExecutor;
032import org.springframework.core.task.SimpleAsyncTaskExecutor;
033import org.springframework.util.Assert;
034import org.springframework.web.context.request.RequestAttributes;
035import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
036import org.springframework.web.util.UrlPathHelper;
037
038/**
039 * The central class for managing asynchronous request processing, mainly intended
040 * as an SPI and not typically used directly by application classes.
041 *
042 * <p>An async scenario starts with request processing as usual in a thread (T1).
043 * Concurrent request handling can be initiated by calling
044 * {@link #startCallableProcessing(Callable, Object...) startCallableProcessing} or
045 * {@link #startDeferredResultProcessing(DeferredResult, Object...) startDeferredResultProcessing},
046 * both of which produce a result in a separate thread (T2). The result is saved
047 * and the request dispatched to the container, to resume processing with the saved
048 * result in a third thread (T3). Within the dispatched thread (T3), the saved
049 * result can be accessed via {@link #getConcurrentResult()} or its presence
050 * detected via {@link #hasConcurrentResult()}.
051 *
052 * @author Rossen Stoyanchev
053 * @author Juergen Hoeller
054 * @since 3.2
055 * @see org.springframework.web.context.request.AsyncWebRequestInterceptor
056 * @see org.springframework.web.servlet.AsyncHandlerInterceptor
057 * @see org.springframework.web.filter.OncePerRequestFilter#shouldNotFilterAsyncDispatch
058 * @see org.springframework.web.filter.OncePerRequestFilter#isAsyncDispatch
059 */
060public final class WebAsyncManager {
061
062        private static final Object RESULT_NONE = new Object();
063
064        private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
065
066        private static final UrlPathHelper urlPathHelper = new UrlPathHelper();
067
068        private static final CallableProcessingInterceptor timeoutCallableInterceptor =
069                        new TimeoutCallableProcessingInterceptor();
070
071        private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
072                        new TimeoutDeferredResultProcessingInterceptor();
073
074
075        private AsyncWebRequest asyncWebRequest;
076
077        private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName());
078
079        private volatile Object concurrentResult = RESULT_NONE;
080
081        private volatile Object[] concurrentResultContext;
082
083        private final Map<Object, CallableProcessingInterceptor> callableInterceptors =
084                        new LinkedHashMap<Object, CallableProcessingInterceptor>();
085
086        private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors =
087                        new LinkedHashMap<Object, DeferredResultProcessingInterceptor>();
088
089
090        /**
091         * Package-private constructor.
092         * @see WebAsyncUtils#getAsyncManager(javax.servlet.ServletRequest)
093         * @see WebAsyncUtils#getAsyncManager(org.springframework.web.context.request.WebRequest)
094         */
095        WebAsyncManager() {
096        }
097
098
099        /**
100         * Configure the {@link AsyncWebRequest} to use. This property may be set
101         * more than once during a single request to accurately reflect the current
102         * state of the request (e.g. following a forward, request/response
103         * wrapping, etc). However, it should not be set while concurrent handling
104         * is in progress, i.e. while {@link #isConcurrentHandlingStarted()} is
105         * {@code true}.
106         * @param asyncWebRequest the web request to use
107         */
108        public void setAsyncWebRequest(final AsyncWebRequest asyncWebRequest) {
109                Assert.notNull(asyncWebRequest, "AsyncWebRequest must not be null");
110                this.asyncWebRequest = asyncWebRequest;
111                this.asyncWebRequest.addCompletionHandler(new Runnable() {
112                        @Override
113                        public void run() {
114                                asyncWebRequest.removeAttribute(WebAsyncUtils.WEB_ASYNC_MANAGER_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST);
115                        }
116                });
117        }
118
119        /**
120         * Configure an AsyncTaskExecutor for use with concurrent processing via
121         * {@link #startCallableProcessing(Callable, Object...)}.
122         * <p>By default a {@link SimpleAsyncTaskExecutor} instance is used.
123         */
124        public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
125                this.taskExecutor = taskExecutor;
126        }
127
128        /**
129         * Whether the selected handler for the current request chose to handle the
130         * request asynchronously. A return value of "true" indicates concurrent
131         * handling is under way and the response will remain open. A return value
132         * of "false" means concurrent handling was either not started or possibly
133         * that it has completed and the request was dispatched for further
134         * processing of the concurrent result.
135         */
136        public boolean isConcurrentHandlingStarted() {
137                return (this.asyncWebRequest != null && this.asyncWebRequest.isAsyncStarted());
138        }
139
140        /**
141         * Whether a result value exists as a result of concurrent handling.
142         */
143        public boolean hasConcurrentResult() {
144                return (this.concurrentResult != RESULT_NONE);
145        }
146
147        /**
148         * Provides access to the result from concurrent handling.
149         * @return an Object, possibly an {@code Exception} or {@code Throwable} if
150         * concurrent handling raised one.
151         * @see #clearConcurrentResult()
152         */
153        public Object getConcurrentResult() {
154                return this.concurrentResult;
155        }
156
157        /**
158         * Provides access to additional processing context saved at the start of
159         * concurrent handling.
160         * @see #clearConcurrentResult()
161         */
162        public Object[] getConcurrentResultContext() {
163                return this.concurrentResultContext;
164        }
165
166        /**
167         * Get the {@link CallableProcessingInterceptor} registered under the given key.
168         * @param key the key
169         * @return the interceptor registered under that key, or {@code null} if none
170         */
171        public CallableProcessingInterceptor getCallableInterceptor(Object key) {
172                return this.callableInterceptors.get(key);
173        }
174
175        /**
176         * Get the {@link DeferredResultProcessingInterceptor} registered under the given key.
177         * @param key the key
178         * @return the interceptor registered under that key, or {@code null} if none
179         */
180        public DeferredResultProcessingInterceptor getDeferredResultInterceptor(Object key) {
181                return this.deferredResultInterceptors.get(key);
182        }
183
184        /**
185         * Register a {@link CallableProcessingInterceptor} under the given key.
186         * @param key the key
187         * @param interceptor the interceptor to register
188         */
189        public void registerCallableInterceptor(Object key, CallableProcessingInterceptor interceptor) {
190                Assert.notNull(key, "Key is required");
191                Assert.notNull(interceptor, "CallableProcessingInterceptor  is required");
192                this.callableInterceptors.put(key, interceptor);
193        }
194
195        /**
196         * Register a {@link CallableProcessingInterceptor} without a key.
197         * The key is derived from the class name and hashcode.
198         * @param interceptors one or more interceptors to register
199         */
200        public void registerCallableInterceptors(CallableProcessingInterceptor... interceptors) {
201                Assert.notNull(interceptors, "A CallableProcessingInterceptor is required");
202                for (CallableProcessingInterceptor interceptor : interceptors) {
203                        String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
204                        this.callableInterceptors.put(key, interceptor);
205                }
206        }
207
208        /**
209         * Register a {@link DeferredResultProcessingInterceptor} under the given key.
210         * @param key the key
211         * @param interceptor the interceptor to register
212         */
213        public void registerDeferredResultInterceptor(Object key, DeferredResultProcessingInterceptor interceptor) {
214                Assert.notNull(key, "Key is required");
215                Assert.notNull(interceptor, "DeferredResultProcessingInterceptor is required");
216                this.deferredResultInterceptors.put(key, interceptor);
217        }
218
219        /**
220         * Register one or more {@link DeferredResultProcessingInterceptor}s without a specified key.
221         * The default key is derived from the interceptor class name and hash code.
222         * @param interceptors one or more interceptors to register
223         */
224        public void registerDeferredResultInterceptors(DeferredResultProcessingInterceptor... interceptors) {
225                Assert.notNull(interceptors, "A DeferredResultProcessingInterceptor is required");
226                for (DeferredResultProcessingInterceptor interceptor : interceptors) {
227                        String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
228                        this.deferredResultInterceptors.put(key, interceptor);
229                }
230        }
231
232        /**
233         * Clear {@linkplain #getConcurrentResult() concurrentResult} and
234         * {@linkplain #getConcurrentResultContext() concurrentResultContext}.
235         */
236        public void clearConcurrentResult() {
237                synchronized (WebAsyncManager.this) {
238                        this.concurrentResult = RESULT_NONE;
239                        this.concurrentResultContext = null;
240                }
241        }
242
243        /**
244         * Start concurrent request processing and execute the given task with an
245         * {@link #setTaskExecutor(AsyncTaskExecutor) AsyncTaskExecutor}. The result
246         * from the task execution is saved and the request dispatched in order to
247         * resume processing of that result. If the task raises an Exception then
248         * the saved result will be the raised Exception.
249         * @param callable a unit of work to be executed asynchronously
250         * @param processingContext additional context to save that can be accessed
251         * via {@link #getConcurrentResultContext()}
252         * @throws Exception if concurrent processing failed to start
253         * @see #getConcurrentResult()
254         * @see #getConcurrentResultContext()
255         */
256        @SuppressWarnings({"unchecked", "rawtypes"})
257        public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
258                Assert.notNull(callable, "Callable must not be null");
259                startCallableProcessing(new WebAsyncTask(callable), processingContext);
260        }
261
262        /**
263         * Use the given {@link WebAsyncTask} to configure the task executor as well as
264         * the timeout value of the {@code AsyncWebRequest} before delegating to
265         * {@link #startCallableProcessing(Callable, Object...)}.
266         * @param webAsyncTask a WebAsyncTask containing the target {@code Callable}
267         * @param processingContext additional context to save that can be accessed
268         * via {@link #getConcurrentResultContext()}
269         * @throws Exception if concurrent processing failed to start
270         */
271        public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
272                        throws Exception {
273
274                Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
275                Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
276
277                Long timeout = webAsyncTask.getTimeout();
278                if (timeout != null) {
279                        this.asyncWebRequest.setTimeout(timeout);
280                }
281
282                AsyncTaskExecutor executor = webAsyncTask.getExecutor();
283                if (executor != null) {
284                        this.taskExecutor = executor;
285                }
286
287                List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>();
288                interceptors.add(webAsyncTask.getInterceptor());
289                interceptors.addAll(this.callableInterceptors.values());
290                interceptors.add(timeoutCallableInterceptor);
291
292                final Callable<?> callable = webAsyncTask.getCallable();
293                final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
294
295                this.asyncWebRequest.addTimeoutHandler(new Runnable() {
296                        @Override
297                        public void run() {
298                                logger.debug("Processing timeout");
299                                Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable);
300                                if (result != CallableProcessingInterceptor.RESULT_NONE) {
301                                        setConcurrentResultAndDispatch(result);
302                                }
303                        }
304                });
305
306                if (this.asyncWebRequest instanceof StandardServletAsyncWebRequest) {
307                        ((StandardServletAsyncWebRequest) this.asyncWebRequest).setErrorHandler(
308                                        new StandardServletAsyncWebRequest.ErrorHandler() {
309                                                @Override
310                                                public void handle(Throwable ex) {
311                                                        setConcurrentResultAndDispatch(ex);
312                                                }
313                                        });
314                }
315
316                this.asyncWebRequest.addCompletionHandler(new Runnable() {
317                        @Override
318                        public void run() {
319                                interceptorChain.triggerAfterCompletion(asyncWebRequest, callable);
320                        }
321                });
322
323                interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
324                startAsyncProcessing(processingContext);
325                try {
326                        Future<?> future = this.taskExecutor.submit(new Runnable() {
327                                @Override
328                                public void run() {
329                                        Object result = null;
330                                        try {
331                                                interceptorChain.applyPreProcess(asyncWebRequest, callable);
332                                                result = callable.call();
333                                        }
334                                        catch (Throwable ex) {
335                                                result = ex;
336                                        }
337                                        finally {
338                                                result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result);
339                                        }
340                                        setConcurrentResultAndDispatch(result);
341                                }
342                        });
343                        interceptorChain.setTaskFuture(future);
344                }
345                catch (RejectedExecutionException ex) {
346                        Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
347                        setConcurrentResultAndDispatch(result);
348                        throw ex;
349                }
350        }
351
352        private void setConcurrentResultAndDispatch(Object result) {
353                synchronized (WebAsyncManager.this) {
354                        if (this.concurrentResult != RESULT_NONE) {
355                                return;
356                        }
357                        this.concurrentResult = result;
358                }
359
360                if (this.asyncWebRequest.isAsyncComplete()) {
361                        logger.error("Could not complete async processing due to timeout or network error");
362                        return;
363                }
364
365                if (logger.isDebugEnabled()) {
366                        logger.debug("Concurrent result value [" + this.concurrentResult +
367                                        "] - dispatching request to resume processing");
368                }
369                this.asyncWebRequest.dispatch();
370        }
371
372        /**
373         * Start concurrent request processing and initialize the given
374         * {@link DeferredResult} with a {@link DeferredResultHandler} that saves
375         * the result and dispatches the request to resume processing of that
376         * result. The {@code AsyncWebRequest} is also updated with a completion
377         * handler that expires the {@code DeferredResult} and a timeout handler
378         * assuming the {@code DeferredResult} has a default timeout result.
379         * @param deferredResult the DeferredResult instance to initialize
380         * @param processingContext additional context to save that can be accessed
381         * via {@link #getConcurrentResultContext()}
382         * @throws Exception if concurrent processing failed to start
383         * @see #getConcurrentResult()
384         * @see #getConcurrentResultContext()
385         */
386        public void startDeferredResultProcessing(
387                        final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
388
389                Assert.notNull(deferredResult, "DeferredResult must not be null");
390                Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
391
392                Long timeout = deferredResult.getTimeoutValue();
393                if (timeout != null) {
394                        this.asyncWebRequest.setTimeout(timeout);
395                }
396
397                List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>();
398                interceptors.add(deferredResult.getInterceptor());
399                interceptors.addAll(this.deferredResultInterceptors.values());
400                interceptors.add(timeoutDeferredResultInterceptor);
401
402                final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
403
404                this.asyncWebRequest.addTimeoutHandler(new Runnable() {
405                        @Override
406                        public void run() {
407                                try {
408                                        interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult);
409                                }
410                                catch (Throwable ex) {
411                                        setConcurrentResultAndDispatch(ex);
412                                }
413                        }
414                });
415
416                if (this.asyncWebRequest instanceof StandardServletAsyncWebRequest) {
417                        ((StandardServletAsyncWebRequest) this.asyncWebRequest).setErrorHandler(
418                                        new StandardServletAsyncWebRequest.ErrorHandler() {
419                                                @Override
420                                                public void handle(Throwable ex) {
421                                                        deferredResult.setErrorResult(ex);
422                                                }
423                                        });
424                }
425
426                this.asyncWebRequest.addCompletionHandler(new Runnable() {
427                        @Override
428                        public void run() {
429                                interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult);
430                        }
431                });
432
433                interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
434                startAsyncProcessing(processingContext);
435
436                try {
437                        interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
438                        deferredResult.setResultHandler(new DeferredResultHandler() {
439                                @Override
440                                public void handleResult(Object result) {
441                                        result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result);
442                                        setConcurrentResultAndDispatch(result);
443                                }
444                        });
445                }
446                catch (Throwable ex) {
447                        setConcurrentResultAndDispatch(ex);
448                }
449        }
450
451        private void startAsyncProcessing(Object[] processingContext) {
452                synchronized (WebAsyncManager.this) {
453                        this.concurrentResult = RESULT_NONE;
454                        this.concurrentResultContext = processingContext;
455                }
456                this.asyncWebRequest.startAsync();
457
458                if (logger.isDebugEnabled()) {
459                        HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
460                        String requestUri = urlPathHelper.getRequestUri(request);
461                        logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]");
462                }
463        }
464
465}