001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.servlet.mvc.method.annotation; 018 019import java.io.IOException; 020import java.util.LinkedHashSet; 021import java.util.Set; 022import java.util.function.Consumer; 023 024import org.springframework.http.MediaType; 025import org.springframework.http.server.ServerHttpResponse; 026import org.springframework.lang.Nullable; 027import org.springframework.util.Assert; 028import org.springframework.util.ObjectUtils; 029 030/** 031 * A controller method return value type for asynchronous request processing 032 * where one or more objects are written to the response. 033 * 034 * <p>While {@link org.springframework.web.context.request.async.DeferredResult} 035 * is used to produce a single result, a {@code ResponseBodyEmitter} can be used 036 * to send multiple objects where each object is written with a compatible 037 * {@link org.springframework.http.converter.HttpMessageConverter}. 038 * 039 * <p>Supported as a return type on its own as well as within a 040 * {@link org.springframework.http.ResponseEntity}. 041 * 042 * <pre> 043 * @RequestMapping(value="/stream", method=RequestMethod.GET) 044 * public ResponseBodyEmitter handle() { 045 * ResponseBodyEmitter emitter = new ResponseBodyEmitter(); 046 * // Pass the emitter to another component... 047 * return emitter; 048 * } 049 * 050 * // in another thread 051 * emitter.send(foo1); 052 * 053 * // and again 054 * emitter.send(foo2); 055 * 056 * // and done 057 * emitter.complete(); 058 * </pre> 059 * 060 * @author Rossen Stoyanchev 061 * @author Juergen Hoeller 062 * @since 4.2 063 */ 064public class ResponseBodyEmitter { 065 066 @Nullable 067 private final Long timeout; 068 069 @Nullable 070 private Handler handler; 071 072 /** Store send data before handler is initialized. */ 073 private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8); 074 075 /** Store successful completion before the handler is initialized. */ 076 private boolean complete; 077 078 /** Store an error before the handler is initialized. */ 079 @Nullable 080 private Throwable failure; 081 082 /** 083 * After an I/O error, we don't call {@link #completeWithError} directly but 084 * wait for the Servlet container to call us via {@code AsyncListener#onError} 085 * on a container thread at which point we call completeWithError. 086 * This flag is used to ignore further calls to complete or completeWithError 087 * that may come for example from an application try-catch block on the 088 * thread of the I/O error. 089 */ 090 private boolean sendFailed; 091 092 private final DefaultCallback timeoutCallback = new DefaultCallback(); 093 094 private final ErrorCallback errorCallback = new ErrorCallback(); 095 096 private final DefaultCallback completionCallback = new DefaultCallback(); 097 098 099 /** 100 * Create a new ResponseBodyEmitter instance. 101 */ 102 public ResponseBodyEmitter() { 103 this.timeout = null; 104 } 105 106 /** 107 * Create a ResponseBodyEmitter with a custom timeout value. 108 * <p>By default not set in which case the default configured in the MVC 109 * Java Config or the MVC namespace is used, or if that's not set, then the 110 * timeout depends on the default of the underlying server. 111 * @param timeout the timeout value in milliseconds 112 */ 113 public ResponseBodyEmitter(Long timeout) { 114 this.timeout = timeout; 115 } 116 117 118 /** 119 * Return the configured timeout value, if any. 120 */ 121 @Nullable 122 public Long getTimeout() { 123 return this.timeout; 124 } 125 126 127 synchronized void initialize(Handler handler) throws IOException { 128 this.handler = handler; 129 130 try { 131 for (DataWithMediaType sendAttempt : this.earlySendAttempts) { 132 sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); 133 } 134 } 135 finally { 136 this.earlySendAttempts.clear(); 137 } 138 139 if (this.complete) { 140 if (this.failure != null) { 141 this.handler.completeWithError(this.failure); 142 } 143 else { 144 this.handler.complete(); 145 } 146 } 147 else { 148 this.handler.onTimeout(this.timeoutCallback); 149 this.handler.onError(this.errorCallback); 150 this.handler.onCompletion(this.completionCallback); 151 } 152 } 153 154 synchronized void initializeWithError(Throwable ex) { 155 this.complete = true; 156 this.failure = ex; 157 this.earlySendAttempts.clear(); 158 this.errorCallback.accept(ex); 159 } 160 161 /** 162 * Invoked after the response is updated with the status code and headers, 163 * if the ResponseBodyEmitter is wrapped in a ResponseEntity, but before the 164 * response is committed, i.e. before the response body has been written to. 165 * <p>The default implementation is empty. 166 */ 167 protected void extendResponse(ServerHttpResponse outputMessage) { 168 } 169 170 /** 171 * Write the given object to the response. 172 * <p>If any exception occurs a dispatch is made back to the app server where 173 * Spring MVC will pass the exception through its exception handling mechanism. 174 * <p><strong>Note:</strong> if the send fails with an IOException, you do 175 * not need to call {@link #completeWithError(Throwable)} in order to clean 176 * up. Instead the Servlet container creates a notification that results in a 177 * dispatch where Spring MVC invokes exception resolvers and completes 178 * processing. 179 * @param object the object to write 180 * @throws IOException raised when an I/O error occurs 181 * @throws java.lang.IllegalStateException wraps any other errors 182 */ 183 public void send(Object object) throws IOException { 184 send(object, null); 185 } 186 187 /** 188 * Overloaded variant of {@link #send(Object)} that also accepts a MediaType 189 * hint for how to serialize the given Object. 190 * @param object the object to write 191 * @param mediaType a MediaType hint for selecting an HttpMessageConverter 192 * @throws IOException raised when an I/O error occurs 193 * @throws java.lang.IllegalStateException wraps any other errors 194 */ 195 public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { 196 Assert.state(!this.complete, 197 "ResponseBodyEmitter has already completed" + 198 (this.failure != null ? " with error: " + this.failure : "")); 199 sendInternal(object, mediaType); 200 } 201 202 private void sendInternal(Object object, @Nullable MediaType mediaType) throws IOException { 203 if (this.handler != null) { 204 try { 205 this.handler.send(object, mediaType); 206 } 207 catch (IOException ex) { 208 this.sendFailed = true; 209 throw ex; 210 } 211 catch (Throwable ex) { 212 this.sendFailed = true; 213 throw new IllegalStateException("Failed to send " + object, ex); 214 } 215 } 216 else { 217 this.earlySendAttempts.add(new DataWithMediaType(object, mediaType)); 218 } 219 } 220 221 /** 222 * Complete request processing by performing a dispatch into the servlet 223 * container, where Spring MVC is invoked once more, and completes the 224 * request processing lifecycle. 225 * <p><strong>Note:</strong> this method should be called by the application 226 * to complete request processing. It should not be used after container 227 * related events such as an error while {@link #send(Object) sending}. 228 */ 229 public synchronized void complete() { 230 // Ignore, after send failure 231 if (this.sendFailed) { 232 return; 233 } 234 this.complete = true; 235 if (this.handler != null) { 236 this.handler.complete(); 237 } 238 } 239 240 /** 241 * Complete request processing with an error. 242 * <p>A dispatch is made into the app server where Spring MVC will pass the 243 * exception through its exception handling mechanism. Note however that 244 * at this stage of request processing, the response is committed and the 245 * response status can no longer be changed. 246 * <p><strong>Note:</strong> this method should be called by the application 247 * to complete request processing with an error. It should not be used after 248 * container related events such as an error while 249 * {@link #send(Object) sending}. 250 */ 251 public synchronized void completeWithError(Throwable ex) { 252 // Ignore, after send failure 253 if (this.sendFailed) { 254 return; 255 } 256 this.complete = true; 257 this.failure = ex; 258 if (this.handler != null) { 259 this.handler.completeWithError(ex); 260 } 261 } 262 263 /** 264 * Register code to invoke when the async request times out. This method is 265 * called from a container thread when an async request times out. 266 */ 267 public synchronized void onTimeout(Runnable callback) { 268 this.timeoutCallback.setDelegate(callback); 269 } 270 271 /** 272 * Register code to invoke for an error during async request processing. 273 * This method is called from a container thread when an error occurred 274 * while processing an async request. 275 * @since 5.0 276 */ 277 public synchronized void onError(Consumer<Throwable> callback) { 278 this.errorCallback.setDelegate(callback); 279 } 280 281 /** 282 * Register code to invoke when the async request completes. This method is 283 * called from a container thread when an async request completed for any 284 * reason including timeout and network error. This method is useful for 285 * detecting that a {@code ResponseBodyEmitter} instance is no longer usable. 286 */ 287 public synchronized void onCompletion(Runnable callback) { 288 this.completionCallback.setDelegate(callback); 289 } 290 291 292 @Override 293 public String toString() { 294 return "ResponseBodyEmitter@" + ObjectUtils.getIdentityHexString(this); 295 } 296 297 298 /** 299 * Contract to handle the sending of event data, the completion of event 300 * sending, and the registration of callbacks to be invoked in case of 301 * timeout, error, and completion for any reason (including from the 302 * container side). 303 */ 304 interface Handler { 305 306 void send(Object data, @Nullable MediaType mediaType) throws IOException; 307 308 void complete(); 309 310 void completeWithError(Throwable failure); 311 312 void onTimeout(Runnable callback); 313 314 void onError(Consumer<Throwable> callback); 315 316 void onCompletion(Runnable callback); 317 } 318 319 320 /** 321 * A simple holder of data to be written along with a MediaType hint for 322 * selecting a message converter to write with. 323 */ 324 public static class DataWithMediaType { 325 326 private final Object data; 327 328 @Nullable 329 private final MediaType mediaType; 330 331 public DataWithMediaType(Object data, @Nullable MediaType mediaType) { 332 this.data = data; 333 this.mediaType = mediaType; 334 } 335 336 public Object getData() { 337 return this.data; 338 } 339 340 @Nullable 341 public MediaType getMediaType() { 342 return this.mediaType; 343 } 344 } 345 346 347 private class DefaultCallback implements Runnable { 348 349 @Nullable 350 private Runnable delegate; 351 352 public void setDelegate(Runnable delegate) { 353 this.delegate = delegate; 354 } 355 356 @Override 357 public void run() { 358 ResponseBodyEmitter.this.complete = true; 359 if (this.delegate != null) { 360 this.delegate.run(); 361 } 362 } 363 } 364 365 366 private class ErrorCallback implements Consumer<Throwable> { 367 368 @Nullable 369 private Consumer<Throwable> delegate; 370 371 public void setDelegate(Consumer<Throwable> callback) { 372 this.delegate = callback; 373 } 374 375 @Override 376 public void accept(Throwable t) { 377 ResponseBodyEmitter.this.complete = true; 378 if (this.delegate != null) { 379 this.delegate.accept(t); 380 } 381 } 382 } 383 384}