001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.context.request.async; 018 019import java.util.ArrayList; 020import java.util.LinkedHashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.Callable; 024import java.util.concurrent.Future; 025import java.util.concurrent.RejectedExecutionException; 026 027import javax.servlet.http.HttpServletRequest; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.core.task.AsyncTaskExecutor; 033import org.springframework.core.task.SimpleAsyncTaskExecutor; 034import org.springframework.core.task.SyncTaskExecutor; 035import org.springframework.lang.Nullable; 036import org.springframework.util.Assert; 037import org.springframework.web.context.request.RequestAttributes; 038import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; 039 040/** 041 * The central class for managing asynchronous request processing, mainly intended 042 * as an SPI and not typically used directly by application classes. 043 * 044 * <p>An async scenario starts with request processing as usual in a thread (T1). 045 * Concurrent request handling can be initiated by calling 046 * {@link #startCallableProcessing(Callable, Object...) startCallableProcessing} or 047 * {@link #startDeferredResultProcessing(DeferredResult, Object...) startDeferredResultProcessing}, 048 * both of which produce a result in a separate thread (T2). The result is saved 049 * and the request dispatched to the container, to resume processing with the saved 050 * result in a third thread (T3). Within the dispatched thread (T3), the saved 051 * result can be accessed via {@link #getConcurrentResult()} or its presence 052 * detected via {@link #hasConcurrentResult()}. 053 * 054 * @author Rossen Stoyanchev 055 * @author Juergen Hoeller 056 * @since 3.2 057 * @see org.springframework.web.context.request.AsyncWebRequestInterceptor 058 * @see org.springframework.web.servlet.AsyncHandlerInterceptor 059 * @see org.springframework.web.filter.OncePerRequestFilter#shouldNotFilterAsyncDispatch 060 * @see org.springframework.web.filter.OncePerRequestFilter#isAsyncDispatch 061 */ 062public final class WebAsyncManager { 063 064 private static final Object RESULT_NONE = new Object(); 065 066 private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = 067 new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName()); 068 069 private static final Log logger = LogFactory.getLog(WebAsyncManager.class); 070 071 private static final CallableProcessingInterceptor timeoutCallableInterceptor = 072 new TimeoutCallableProcessingInterceptor(); 073 074 private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = 075 new TimeoutDeferredResultProcessingInterceptor(); 076 077 private static Boolean taskExecutorWarning = true; 078 079 080 private AsyncWebRequest asyncWebRequest; 081 082 private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR; 083 084 private volatile Object concurrentResult = RESULT_NONE; 085 086 private volatile Object[] concurrentResultContext; 087 088 /* 089 * Whether the concurrentResult is an error. If such errors remain unhandled, some 090 * Servlet containers will call AsyncListener#onError at the end, after the ASYNC 091 * and/or the ERROR dispatch (Boot's case), and we need to ignore those. 092 */ 093 private volatile boolean errorHandlingInProgress; 094 095 private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>(); 096 097 private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<>(); 098 099 100 /** 101 * Package-private constructor. 102 * @see WebAsyncUtils#getAsyncManager(javax.servlet.ServletRequest) 103 * @see WebAsyncUtils#getAsyncManager(org.springframework.web.context.request.WebRequest) 104 */ 105 WebAsyncManager() { 106 } 107 108 109 /** 110 * Configure the {@link AsyncWebRequest} to use. This property may be set 111 * more than once during a single request to accurately reflect the current 112 * state of the request (e.g. following a forward, request/response 113 * wrapping, etc). However, it should not be set while concurrent handling 114 * is in progress, i.e. while {@link #isConcurrentHandlingStarted()} is 115 * {@code true}. 116 * @param asyncWebRequest the web request to use 117 */ 118 public void setAsyncWebRequest(AsyncWebRequest asyncWebRequest) { 119 Assert.notNull(asyncWebRequest, "AsyncWebRequest must not be null"); 120 this.asyncWebRequest = asyncWebRequest; 121 this.asyncWebRequest.addCompletionHandler(() -> asyncWebRequest.removeAttribute( 122 WebAsyncUtils.WEB_ASYNC_MANAGER_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST)); 123 } 124 125 /** 126 * Configure an AsyncTaskExecutor for use with concurrent processing via 127 * {@link #startCallableProcessing(Callable, Object...)}. 128 * <p>By default a {@link SimpleAsyncTaskExecutor} instance is used. 129 */ 130 public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { 131 this.taskExecutor = taskExecutor; 132 } 133 134 /** 135 * Whether the selected handler for the current request chose to handle the 136 * request asynchronously. A return value of "true" indicates concurrent 137 * handling is under way and the response will remain open. A return value 138 * of "false" means concurrent handling was either not started or possibly 139 * that it has completed and the request was dispatched for further 140 * processing of the concurrent result. 141 */ 142 public boolean isConcurrentHandlingStarted() { 143 return (this.asyncWebRequest != null && this.asyncWebRequest.isAsyncStarted()); 144 } 145 146 /** 147 * Whether a result value exists as a result of concurrent handling. 148 */ 149 public boolean hasConcurrentResult() { 150 return (this.concurrentResult != RESULT_NONE); 151 } 152 153 /** 154 * Provides access to the result from concurrent handling. 155 * @return an Object, possibly an {@code Exception} or {@code Throwable} if 156 * concurrent handling raised one. 157 * @see #clearConcurrentResult() 158 */ 159 public Object getConcurrentResult() { 160 return this.concurrentResult; 161 } 162 163 /** 164 * Provides access to additional processing context saved at the start of 165 * concurrent handling. 166 * @see #clearConcurrentResult() 167 */ 168 public Object[] getConcurrentResultContext() { 169 return this.concurrentResultContext; 170 } 171 172 /** 173 * Get the {@link CallableProcessingInterceptor} registered under the given key. 174 * @param key the key 175 * @return the interceptor registered under that key, or {@code null} if none 176 */ 177 @Nullable 178 public CallableProcessingInterceptor getCallableInterceptor(Object key) { 179 return this.callableInterceptors.get(key); 180 } 181 182 /** 183 * Get the {@link DeferredResultProcessingInterceptor} registered under the given key. 184 * @param key the key 185 * @return the interceptor registered under that key, or {@code null} if none 186 */ 187 @Nullable 188 public DeferredResultProcessingInterceptor getDeferredResultInterceptor(Object key) { 189 return this.deferredResultInterceptors.get(key); 190 } 191 192 /** 193 * Register a {@link CallableProcessingInterceptor} under the given key. 194 * @param key the key 195 * @param interceptor the interceptor to register 196 */ 197 public void registerCallableInterceptor(Object key, CallableProcessingInterceptor interceptor) { 198 Assert.notNull(key, "Key is required"); 199 Assert.notNull(interceptor, "CallableProcessingInterceptor is required"); 200 this.callableInterceptors.put(key, interceptor); 201 } 202 203 /** 204 * Register a {@link CallableProcessingInterceptor} without a key. 205 * The key is derived from the class name and hashcode. 206 * @param interceptors one or more interceptors to register 207 */ 208 public void registerCallableInterceptors(CallableProcessingInterceptor... interceptors) { 209 Assert.notNull(interceptors, "A CallableProcessingInterceptor is required"); 210 for (CallableProcessingInterceptor interceptor : interceptors) { 211 String key = interceptor.getClass().getName() + ":" + interceptor.hashCode(); 212 this.callableInterceptors.put(key, interceptor); 213 } 214 } 215 216 /** 217 * Register a {@link DeferredResultProcessingInterceptor} under the given key. 218 * @param key the key 219 * @param interceptor the interceptor to register 220 */ 221 public void registerDeferredResultInterceptor(Object key, DeferredResultProcessingInterceptor interceptor) { 222 Assert.notNull(key, "Key is required"); 223 Assert.notNull(interceptor, "DeferredResultProcessingInterceptor is required"); 224 this.deferredResultInterceptors.put(key, interceptor); 225 } 226 227 /** 228 * Register one or more {@link DeferredResultProcessingInterceptor DeferredResultProcessingInterceptors} without a specified key. 229 * The default key is derived from the interceptor class name and hash code. 230 * @param interceptors one or more interceptors to register 231 */ 232 public void registerDeferredResultInterceptors(DeferredResultProcessingInterceptor... interceptors) { 233 Assert.notNull(interceptors, "A DeferredResultProcessingInterceptor is required"); 234 for (DeferredResultProcessingInterceptor interceptor : interceptors) { 235 String key = interceptor.getClass().getName() + ":" + interceptor.hashCode(); 236 this.deferredResultInterceptors.put(key, interceptor); 237 } 238 } 239 240 /** 241 * Clear {@linkplain #getConcurrentResult() concurrentResult} and 242 * {@linkplain #getConcurrentResultContext() concurrentResultContext}. 243 */ 244 public void clearConcurrentResult() { 245 synchronized (WebAsyncManager.this) { 246 this.concurrentResult = RESULT_NONE; 247 this.concurrentResultContext = null; 248 } 249 } 250 251 /** 252 * Start concurrent request processing and execute the given task with an 253 * {@link #setTaskExecutor(AsyncTaskExecutor) AsyncTaskExecutor}. The result 254 * from the task execution is saved and the request dispatched in order to 255 * resume processing of that result. If the task raises an Exception then 256 * the saved result will be the raised Exception. 257 * @param callable a unit of work to be executed asynchronously 258 * @param processingContext additional context to save that can be accessed 259 * via {@link #getConcurrentResultContext()} 260 * @throws Exception if concurrent processing failed to start 261 * @see #getConcurrentResult() 262 * @see #getConcurrentResultContext() 263 */ 264 @SuppressWarnings({"rawtypes", "unchecked"}) 265 public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception { 266 Assert.notNull(callable, "Callable must not be null"); 267 startCallableProcessing(new WebAsyncTask(callable), processingContext); 268 } 269 270 /** 271 * Use the given {@link WebAsyncTask} to configure the task executor as well as 272 * the timeout value of the {@code AsyncWebRequest} before delegating to 273 * {@link #startCallableProcessing(Callable, Object...)}. 274 * @param webAsyncTask a WebAsyncTask containing the target {@code Callable} 275 * @param processingContext additional context to save that can be accessed 276 * via {@link #getConcurrentResultContext()} 277 * @throws Exception if concurrent processing failed to start 278 */ 279 public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) 280 throws Exception { 281 282 Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); 283 Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); 284 285 Long timeout = webAsyncTask.getTimeout(); 286 if (timeout != null) { 287 this.asyncWebRequest.setTimeout(timeout); 288 } 289 290 AsyncTaskExecutor executor = webAsyncTask.getExecutor(); 291 if (executor != null) { 292 this.taskExecutor = executor; 293 } 294 else { 295 logExecutorWarning(); 296 } 297 298 List<CallableProcessingInterceptor> interceptors = new ArrayList<>(); 299 interceptors.add(webAsyncTask.getInterceptor()); 300 interceptors.addAll(this.callableInterceptors.values()); 301 interceptors.add(timeoutCallableInterceptor); 302 303 final Callable<?> callable = webAsyncTask.getCallable(); 304 final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors); 305 306 this.asyncWebRequest.addTimeoutHandler(() -> { 307 if (logger.isDebugEnabled()) { 308 logger.debug("Async request timeout for " + formatRequestUri()); 309 } 310 Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable); 311 if (result != CallableProcessingInterceptor.RESULT_NONE) { 312 setConcurrentResultAndDispatch(result); 313 } 314 }); 315 316 this.asyncWebRequest.addErrorHandler(ex -> { 317 if (!this.errorHandlingInProgress) { 318 if (logger.isDebugEnabled()) { 319 logger.debug("Async request error for " + formatRequestUri() + ": " + ex); 320 } 321 Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex); 322 result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex); 323 setConcurrentResultAndDispatch(result); 324 } 325 }); 326 327 this.asyncWebRequest.addCompletionHandler(() -> 328 interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable)); 329 330 interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable); 331 startAsyncProcessing(processingContext); 332 try { 333 Future<?> future = this.taskExecutor.submit(() -> { 334 Object result = null; 335 try { 336 interceptorChain.applyPreProcess(this.asyncWebRequest, callable); 337 result = callable.call(); 338 } 339 catch (Throwable ex) { 340 result = ex; 341 } 342 finally { 343 result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result); 344 } 345 setConcurrentResultAndDispatch(result); 346 }); 347 interceptorChain.setTaskFuture(future); 348 } 349 catch (RejectedExecutionException ex) { 350 Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex); 351 setConcurrentResultAndDispatch(result); 352 throw ex; 353 } 354 } 355 356 private void logExecutorWarning() { 357 if (taskExecutorWarning && logger.isWarnEnabled()) { 358 synchronized (DEFAULT_TASK_EXECUTOR) { 359 AsyncTaskExecutor executor = this.taskExecutor; 360 if (taskExecutorWarning && 361 (executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) { 362 String executorTypeName = executor.getClass().getSimpleName(); 363 logger.warn("\n!!!\n" + 364 "An Executor is required to handle java.util.concurrent.Callable return values.\n" + 365 "Please, configure a TaskExecutor in the MVC config under \"async support\".\n" + 366 "The " + executorTypeName + " currently in use is not suitable under load.\n" + 367 "-------------------------------\n" + 368 "Request URI: '" + formatRequestUri() + "'\n" + 369 "!!!"); 370 taskExecutorWarning = false; 371 } 372 } 373 } 374 } 375 376 private String formatRequestUri() { 377 HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class); 378 return request != null ? request.getRequestURI() : "servlet container"; 379 } 380 381 private void setConcurrentResultAndDispatch(Object result) { 382 synchronized (WebAsyncManager.this) { 383 if (this.concurrentResult != RESULT_NONE) { 384 return; 385 } 386 this.concurrentResult = result; 387 this.errorHandlingInProgress = (result instanceof Throwable); 388 } 389 390 if (this.asyncWebRequest.isAsyncComplete()) { 391 if (logger.isDebugEnabled()) { 392 logger.debug("Async result set but request already complete: " + formatRequestUri()); 393 } 394 return; 395 } 396 397 if (logger.isDebugEnabled()) { 398 boolean isError = result instanceof Throwable; 399 logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri()); 400 } 401 this.asyncWebRequest.dispatch(); 402 } 403 404 /** 405 * Start concurrent request processing and initialize the given 406 * {@link DeferredResult} with a {@link DeferredResultHandler} that saves 407 * the result and dispatches the request to resume processing of that 408 * result. The {@code AsyncWebRequest} is also updated with a completion 409 * handler that expires the {@code DeferredResult} and a timeout handler 410 * assuming the {@code DeferredResult} has a default timeout result. 411 * @param deferredResult the DeferredResult instance to initialize 412 * @param processingContext additional context to save that can be accessed 413 * via {@link #getConcurrentResultContext()} 414 * @throws Exception if concurrent processing failed to start 415 * @see #getConcurrentResult() 416 * @see #getConcurrentResultContext() 417 */ 418 public void startDeferredResultProcessing( 419 final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { 420 421 Assert.notNull(deferredResult, "DeferredResult must not be null"); 422 Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); 423 424 Long timeout = deferredResult.getTimeoutValue(); 425 if (timeout != null) { 426 this.asyncWebRequest.setTimeout(timeout); 427 } 428 429 List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>(); 430 interceptors.add(deferredResult.getInterceptor()); 431 interceptors.addAll(this.deferredResultInterceptors.values()); 432 interceptors.add(timeoutDeferredResultInterceptor); 433 434 final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); 435 436 this.asyncWebRequest.addTimeoutHandler(() -> { 437 try { 438 interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult); 439 } 440 catch (Throwable ex) { 441 setConcurrentResultAndDispatch(ex); 442 } 443 }); 444 445 this.asyncWebRequest.addErrorHandler(ex -> { 446 if (!this.errorHandlingInProgress) { 447 try { 448 if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) { 449 return; 450 } 451 deferredResult.setErrorResult(ex); 452 } 453 catch (Throwable interceptorEx) { 454 setConcurrentResultAndDispatch(interceptorEx); 455 } 456 } 457 }); 458 459 this.asyncWebRequest.addCompletionHandler(() 460 -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult)); 461 462 interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult); 463 startAsyncProcessing(processingContext); 464 465 try { 466 interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); 467 deferredResult.setResultHandler(result -> { 468 result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result); 469 setConcurrentResultAndDispatch(result); 470 }); 471 } 472 catch (Throwable ex) { 473 setConcurrentResultAndDispatch(ex); 474 } 475 } 476 477 private void startAsyncProcessing(Object[] processingContext) { 478 synchronized (WebAsyncManager.this) { 479 this.concurrentResult = RESULT_NONE; 480 this.concurrentResultContext = processingContext; 481 this.errorHandlingInProgress = false; 482 } 483 this.asyncWebRequest.startAsync(); 484 485 if (logger.isDebugEnabled()) { 486 logger.debug("Started async request"); 487 } 488 } 489 490}