001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.context.request.async; 018 019import java.util.ArrayList; 020import java.util.LinkedHashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.Callable; 024import java.util.concurrent.Future; 025import java.util.concurrent.RejectedExecutionException; 026import javax.servlet.http.HttpServletRequest; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030 031import org.springframework.core.task.AsyncTaskExecutor; 032import org.springframework.core.task.SimpleAsyncTaskExecutor; 033import org.springframework.util.Assert; 034import org.springframework.web.context.request.RequestAttributes; 035import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; 036import org.springframework.web.util.UrlPathHelper; 037 038/** 039 * The central class for managing asynchronous request processing, mainly intended 040 * as an SPI and not typically used directly by application classes. 041 * 042 * <p>An async scenario starts with request processing as usual in a thread (T1). 043 * Concurrent request handling can be initiated by calling 044 * {@link #startCallableProcessing(Callable, Object...) startCallableProcessing} or 045 * {@link #startDeferredResultProcessing(DeferredResult, Object...) startDeferredResultProcessing}, 046 * both of which produce a result in a separate thread (T2). The result is saved 047 * and the request dispatched to the container, to resume processing with the saved 048 * result in a third thread (T3). Within the dispatched thread (T3), the saved 049 * result can be accessed via {@link #getConcurrentResult()} or its presence 050 * detected via {@link #hasConcurrentResult()}. 051 * 052 * @author Rossen Stoyanchev 053 * @author Juergen Hoeller 054 * @since 3.2 055 * @see org.springframework.web.context.request.AsyncWebRequestInterceptor 056 * @see org.springframework.web.servlet.AsyncHandlerInterceptor 057 * @see org.springframework.web.filter.OncePerRequestFilter#shouldNotFilterAsyncDispatch 058 * @see org.springframework.web.filter.OncePerRequestFilter#isAsyncDispatch 059 */ 060public final class WebAsyncManager { 061 062 private static final Object RESULT_NONE = new Object(); 063 064 private static final Log logger = LogFactory.getLog(WebAsyncManager.class); 065 066 private static final UrlPathHelper urlPathHelper = new UrlPathHelper(); 067 068 private static final CallableProcessingInterceptor timeoutCallableInterceptor = 069 new TimeoutCallableProcessingInterceptor(); 070 071 private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = 072 new TimeoutDeferredResultProcessingInterceptor(); 073 074 075 private AsyncWebRequest asyncWebRequest; 076 077 private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName()); 078 079 private volatile Object concurrentResult = RESULT_NONE; 080 081 private volatile Object[] concurrentResultContext; 082 083 private final Map<Object, CallableProcessingInterceptor> callableInterceptors = 084 new LinkedHashMap<Object, CallableProcessingInterceptor>(); 085 086 private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = 087 new LinkedHashMap<Object, DeferredResultProcessingInterceptor>(); 088 089 090 /** 091 * Package-private constructor. 092 * @see WebAsyncUtils#getAsyncManager(javax.servlet.ServletRequest) 093 * @see WebAsyncUtils#getAsyncManager(org.springframework.web.context.request.WebRequest) 094 */ 095 WebAsyncManager() { 096 } 097 098 099 /** 100 * Configure the {@link AsyncWebRequest} to use. This property may be set 101 * more than once during a single request to accurately reflect the current 102 * state of the request (e.g. following a forward, request/response 103 * wrapping, etc). However, it should not be set while concurrent handling 104 * is in progress, i.e. while {@link #isConcurrentHandlingStarted()} is 105 * {@code true}. 106 * @param asyncWebRequest the web request to use 107 */ 108 public void setAsyncWebRequest(final AsyncWebRequest asyncWebRequest) { 109 Assert.notNull(asyncWebRequest, "AsyncWebRequest must not be null"); 110 this.asyncWebRequest = asyncWebRequest; 111 this.asyncWebRequest.addCompletionHandler(new Runnable() { 112 @Override 113 public void run() { 114 asyncWebRequest.removeAttribute(WebAsyncUtils.WEB_ASYNC_MANAGER_ATTRIBUTE, RequestAttributes.SCOPE_REQUEST); 115 } 116 }); 117 } 118 119 /** 120 * Configure an AsyncTaskExecutor for use with concurrent processing via 121 * {@link #startCallableProcessing(Callable, Object...)}. 122 * <p>By default a {@link SimpleAsyncTaskExecutor} instance is used. 123 */ 124 public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { 125 this.taskExecutor = taskExecutor; 126 } 127 128 /** 129 * Whether the selected handler for the current request chose to handle the 130 * request asynchronously. A return value of "true" indicates concurrent 131 * handling is under way and the response will remain open. A return value 132 * of "false" means concurrent handling was either not started or possibly 133 * that it has completed and the request was dispatched for further 134 * processing of the concurrent result. 135 */ 136 public boolean isConcurrentHandlingStarted() { 137 return (this.asyncWebRequest != null && this.asyncWebRequest.isAsyncStarted()); 138 } 139 140 /** 141 * Whether a result value exists as a result of concurrent handling. 142 */ 143 public boolean hasConcurrentResult() { 144 return (this.concurrentResult != RESULT_NONE); 145 } 146 147 /** 148 * Provides access to the result from concurrent handling. 149 * @return an Object, possibly an {@code Exception} or {@code Throwable} if 150 * concurrent handling raised one. 151 * @see #clearConcurrentResult() 152 */ 153 public Object getConcurrentResult() { 154 return this.concurrentResult; 155 } 156 157 /** 158 * Provides access to additional processing context saved at the start of 159 * concurrent handling. 160 * @see #clearConcurrentResult() 161 */ 162 public Object[] getConcurrentResultContext() { 163 return this.concurrentResultContext; 164 } 165 166 /** 167 * Get the {@link CallableProcessingInterceptor} registered under the given key. 168 * @param key the key 169 * @return the interceptor registered under that key, or {@code null} if none 170 */ 171 public CallableProcessingInterceptor getCallableInterceptor(Object key) { 172 return this.callableInterceptors.get(key); 173 } 174 175 /** 176 * Get the {@link DeferredResultProcessingInterceptor} registered under the given key. 177 * @param key the key 178 * @return the interceptor registered under that key, or {@code null} if none 179 */ 180 public DeferredResultProcessingInterceptor getDeferredResultInterceptor(Object key) { 181 return this.deferredResultInterceptors.get(key); 182 } 183 184 /** 185 * Register a {@link CallableProcessingInterceptor} under the given key. 186 * @param key the key 187 * @param interceptor the interceptor to register 188 */ 189 public void registerCallableInterceptor(Object key, CallableProcessingInterceptor interceptor) { 190 Assert.notNull(key, "Key is required"); 191 Assert.notNull(interceptor, "CallableProcessingInterceptor is required"); 192 this.callableInterceptors.put(key, interceptor); 193 } 194 195 /** 196 * Register a {@link CallableProcessingInterceptor} without a key. 197 * The key is derived from the class name and hashcode. 198 * @param interceptors one or more interceptors to register 199 */ 200 public void registerCallableInterceptors(CallableProcessingInterceptor... interceptors) { 201 Assert.notNull(interceptors, "A CallableProcessingInterceptor is required"); 202 for (CallableProcessingInterceptor interceptor : interceptors) { 203 String key = interceptor.getClass().getName() + ":" + interceptor.hashCode(); 204 this.callableInterceptors.put(key, interceptor); 205 } 206 } 207 208 /** 209 * Register a {@link DeferredResultProcessingInterceptor} under the given key. 210 * @param key the key 211 * @param interceptor the interceptor to register 212 */ 213 public void registerDeferredResultInterceptor(Object key, DeferredResultProcessingInterceptor interceptor) { 214 Assert.notNull(key, "Key is required"); 215 Assert.notNull(interceptor, "DeferredResultProcessingInterceptor is required"); 216 this.deferredResultInterceptors.put(key, interceptor); 217 } 218 219 /** 220 * Register one or more {@link DeferredResultProcessingInterceptor}s without a specified key. 221 * The default key is derived from the interceptor class name and hash code. 222 * @param interceptors one or more interceptors to register 223 */ 224 public void registerDeferredResultInterceptors(DeferredResultProcessingInterceptor... interceptors) { 225 Assert.notNull(interceptors, "A DeferredResultProcessingInterceptor is required"); 226 for (DeferredResultProcessingInterceptor interceptor : interceptors) { 227 String key = interceptor.getClass().getName() + ":" + interceptor.hashCode(); 228 this.deferredResultInterceptors.put(key, interceptor); 229 } 230 } 231 232 /** 233 * Clear {@linkplain #getConcurrentResult() concurrentResult} and 234 * {@linkplain #getConcurrentResultContext() concurrentResultContext}. 235 */ 236 public void clearConcurrentResult() { 237 synchronized (WebAsyncManager.this) { 238 this.concurrentResult = RESULT_NONE; 239 this.concurrentResultContext = null; 240 } 241 } 242 243 /** 244 * Start concurrent request processing and execute the given task with an 245 * {@link #setTaskExecutor(AsyncTaskExecutor) AsyncTaskExecutor}. The result 246 * from the task execution is saved and the request dispatched in order to 247 * resume processing of that result. If the task raises an Exception then 248 * the saved result will be the raised Exception. 249 * @param callable a unit of work to be executed asynchronously 250 * @param processingContext additional context to save that can be accessed 251 * via {@link #getConcurrentResultContext()} 252 * @throws Exception if concurrent processing failed to start 253 * @see #getConcurrentResult() 254 * @see #getConcurrentResultContext() 255 */ 256 @SuppressWarnings({"unchecked", "rawtypes"}) 257 public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception { 258 Assert.notNull(callable, "Callable must not be null"); 259 startCallableProcessing(new WebAsyncTask(callable), processingContext); 260 } 261 262 /** 263 * Use the given {@link WebAsyncTask} to configure the task executor as well as 264 * the timeout value of the {@code AsyncWebRequest} before delegating to 265 * {@link #startCallableProcessing(Callable, Object...)}. 266 * @param webAsyncTask a WebAsyncTask containing the target {@code Callable} 267 * @param processingContext additional context to save that can be accessed 268 * via {@link #getConcurrentResultContext()} 269 * @throws Exception if concurrent processing failed to start 270 */ 271 public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) 272 throws Exception { 273 274 Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); 275 Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); 276 277 Long timeout = webAsyncTask.getTimeout(); 278 if (timeout != null) { 279 this.asyncWebRequest.setTimeout(timeout); 280 } 281 282 AsyncTaskExecutor executor = webAsyncTask.getExecutor(); 283 if (executor != null) { 284 this.taskExecutor = executor; 285 } 286 287 List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>(); 288 interceptors.add(webAsyncTask.getInterceptor()); 289 interceptors.addAll(this.callableInterceptors.values()); 290 interceptors.add(timeoutCallableInterceptor); 291 292 final Callable<?> callable = webAsyncTask.getCallable(); 293 final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors); 294 295 this.asyncWebRequest.addTimeoutHandler(new Runnable() { 296 @Override 297 public void run() { 298 logger.debug("Processing timeout"); 299 Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable); 300 if (result != CallableProcessingInterceptor.RESULT_NONE) { 301 setConcurrentResultAndDispatch(result); 302 } 303 } 304 }); 305 306 if (this.asyncWebRequest instanceof StandardServletAsyncWebRequest) { 307 ((StandardServletAsyncWebRequest) this.asyncWebRequest).setErrorHandler( 308 new StandardServletAsyncWebRequest.ErrorHandler() { 309 @Override 310 public void handle(Throwable ex) { 311 setConcurrentResultAndDispatch(ex); 312 } 313 }); 314 } 315 316 this.asyncWebRequest.addCompletionHandler(new Runnable() { 317 @Override 318 public void run() { 319 interceptorChain.triggerAfterCompletion(asyncWebRequest, callable); 320 } 321 }); 322 323 interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable); 324 startAsyncProcessing(processingContext); 325 try { 326 Future<?> future = this.taskExecutor.submit(new Runnable() { 327 @Override 328 public void run() { 329 Object result = null; 330 try { 331 interceptorChain.applyPreProcess(asyncWebRequest, callable); 332 result = callable.call(); 333 } 334 catch (Throwable ex) { 335 result = ex; 336 } 337 finally { 338 result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result); 339 } 340 setConcurrentResultAndDispatch(result); 341 } 342 }); 343 interceptorChain.setTaskFuture(future); 344 } 345 catch (RejectedExecutionException ex) { 346 Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex); 347 setConcurrentResultAndDispatch(result); 348 throw ex; 349 } 350 } 351 352 private void setConcurrentResultAndDispatch(Object result) { 353 synchronized (WebAsyncManager.this) { 354 if (this.concurrentResult != RESULT_NONE) { 355 return; 356 } 357 this.concurrentResult = result; 358 } 359 360 if (this.asyncWebRequest.isAsyncComplete()) { 361 logger.error("Could not complete async processing due to timeout or network error"); 362 return; 363 } 364 365 if (logger.isDebugEnabled()) { 366 logger.debug("Concurrent result value [" + this.concurrentResult + 367 "] - dispatching request to resume processing"); 368 } 369 this.asyncWebRequest.dispatch(); 370 } 371 372 /** 373 * Start concurrent request processing and initialize the given 374 * {@link DeferredResult} with a {@link DeferredResultHandler} that saves 375 * the result and dispatches the request to resume processing of that 376 * result. The {@code AsyncWebRequest} is also updated with a completion 377 * handler that expires the {@code DeferredResult} and a timeout handler 378 * assuming the {@code DeferredResult} has a default timeout result. 379 * @param deferredResult the DeferredResult instance to initialize 380 * @param processingContext additional context to save that can be accessed 381 * via {@link #getConcurrentResultContext()} 382 * @throws Exception if concurrent processing failed to start 383 * @see #getConcurrentResult() 384 * @see #getConcurrentResultContext() 385 */ 386 public void startDeferredResultProcessing( 387 final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { 388 389 Assert.notNull(deferredResult, "DeferredResult must not be null"); 390 Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); 391 392 Long timeout = deferredResult.getTimeoutValue(); 393 if (timeout != null) { 394 this.asyncWebRequest.setTimeout(timeout); 395 } 396 397 List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>(); 398 interceptors.add(deferredResult.getInterceptor()); 399 interceptors.addAll(this.deferredResultInterceptors.values()); 400 interceptors.add(timeoutDeferredResultInterceptor); 401 402 final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); 403 404 this.asyncWebRequest.addTimeoutHandler(new Runnable() { 405 @Override 406 public void run() { 407 try { 408 interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult); 409 } 410 catch (Throwable ex) { 411 setConcurrentResultAndDispatch(ex); 412 } 413 } 414 }); 415 416 if (this.asyncWebRequest instanceof StandardServletAsyncWebRequest) { 417 ((StandardServletAsyncWebRequest) this.asyncWebRequest).setErrorHandler( 418 new StandardServletAsyncWebRequest.ErrorHandler() { 419 @Override 420 public void handle(Throwable ex) { 421 deferredResult.setErrorResult(ex); 422 } 423 }); 424 } 425 426 this.asyncWebRequest.addCompletionHandler(new Runnable() { 427 @Override 428 public void run() { 429 interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult); 430 } 431 }); 432 433 interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult); 434 startAsyncProcessing(processingContext); 435 436 try { 437 interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); 438 deferredResult.setResultHandler(new DeferredResultHandler() { 439 @Override 440 public void handleResult(Object result) { 441 result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result); 442 setConcurrentResultAndDispatch(result); 443 } 444 }); 445 } 446 catch (Throwable ex) { 447 setConcurrentResultAndDispatch(ex); 448 } 449 } 450 451 private void startAsyncProcessing(Object[] processingContext) { 452 synchronized (WebAsyncManager.this) { 453 this.concurrentResult = RESULT_NONE; 454 this.concurrentResultContext = processingContext; 455 } 456 this.asyncWebRequest.startAsync(); 457 458 if (logger.isDebugEnabled()) { 459 HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class); 460 String requestUri = urlPathHelper.getRequestUri(request); 461 logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]"); 462 } 463 } 464 465}