001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.servlet.mvc.method.annotation; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.nio.charset.StandardCharsets; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.function.Consumer; 025 026import javax.servlet.ServletRequest; 027import javax.servlet.http.HttpServletResponse; 028 029import org.springframework.core.MethodParameter; 030import org.springframework.core.ReactiveAdapterRegistry; 031import org.springframework.core.ResolvableType; 032import org.springframework.core.task.TaskExecutor; 033import org.springframework.http.HttpHeaders; 034import org.springframework.http.HttpStatus; 035import org.springframework.http.MediaType; 036import org.springframework.http.ResponseEntity; 037import org.springframework.http.converter.HttpMessageConverter; 038import org.springframework.http.converter.StringHttpMessageConverter; 039import org.springframework.http.server.ServerHttpResponse; 040import org.springframework.http.server.ServletServerHttpResponse; 041import org.springframework.lang.Nullable; 042import org.springframework.util.Assert; 043import org.springframework.web.accept.ContentNegotiationManager; 044import org.springframework.web.context.request.NativeWebRequest; 045import org.springframework.web.context.request.async.DeferredResult; 046import org.springframework.web.context.request.async.WebAsyncUtils; 047import org.springframework.web.filter.ShallowEtagHeaderFilter; 048import org.springframework.web.method.support.HandlerMethodReturnValueHandler; 049import org.springframework.web.method.support.ModelAndViewContainer; 050 051/** 052 * Handler for return values of type {@link ResponseBodyEmitter} and sub-classes 053 * such as {@link SseEmitter} including the same types wrapped with 054 * {@link ResponseEntity}. 055 * 056 * <p>As of 5.0 also supports reactive return value types for any reactive 057 * library with registered adapters in {@link ReactiveAdapterRegistry}. 058 * 059 * @author Rossen Stoyanchev 060 * @since 4.2 061 */ 062public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler { 063 064 private final List<HttpMessageConverter<?>> messageConverters; 065 066 private final List<HttpMessageConverter<?>> sseMessageConverters; 067 068 private final ReactiveTypeHandler reactiveHandler; 069 070 071 /** 072 * Simple constructor with reactive type support based on a default instance of 073 * {@link ReactiveAdapterRegistry}, 074 * {@link org.springframework.core.task.SyncTaskExecutor}, and 075 * {@link ContentNegotiationManager} with an Accept header strategy. 076 */ 077 public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) { 078 Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty"); 079 this.messageConverters = messageConverters; 080 this.sseMessageConverters = initSseConverters(messageConverters); 081 this.reactiveHandler = new ReactiveTypeHandler(); 082 } 083 084 /** 085 * Complete constructor with pluggable "reactive" type support. 086 * @param messageConverters converters to write emitted objects with 087 * @param registry for reactive return value type support 088 * @param executor for blocking I/O writes of items emitted from reactive types 089 * @param manager for detecting streaming media types 090 * @since 5.0 091 */ 092 public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters, 093 ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) { 094 095 Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty"); 096 this.messageConverters = messageConverters; 097 this.sseMessageConverters = initSseConverters(messageConverters); 098 this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager); 099 } 100 101 private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) { 102 for (HttpMessageConverter<?> converter : converters) { 103 if (converter.canWrite(String.class, MediaType.TEXT_PLAIN)) { 104 return converters; 105 } 106 } 107 List<HttpMessageConverter<?>> result = new ArrayList<>(converters.size() + 1); 108 result.add(new StringHttpMessageConverter(StandardCharsets.UTF_8)); 109 result.addAll(converters); 110 return result; 111 } 112 113 114 @Override 115 public boolean supportsReturnType(MethodParameter returnType) { 116 Class<?> bodyType = ResponseEntity.class.isAssignableFrom(returnType.getParameterType()) ? 117 ResolvableType.forMethodParameter(returnType).getGeneric().resolve() : 118 returnType.getParameterType(); 119 120 return (bodyType != null && (ResponseBodyEmitter.class.isAssignableFrom(bodyType) || 121 this.reactiveHandler.isReactiveType(bodyType))); 122 } 123 124 @Override 125 @SuppressWarnings("resource") 126 public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, 127 ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { 128 129 if (returnValue == null) { 130 mavContainer.setRequestHandled(true); 131 return; 132 } 133 134 HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class); 135 Assert.state(response != null, "No HttpServletResponse"); 136 ServerHttpResponse outputMessage = new ServletServerHttpResponse(response); 137 138 if (returnValue instanceof ResponseEntity) { 139 ResponseEntity<?> responseEntity = (ResponseEntity<?>) returnValue; 140 response.setStatus(responseEntity.getStatusCodeValue()); 141 outputMessage.getHeaders().putAll(responseEntity.getHeaders()); 142 returnValue = responseEntity.getBody(); 143 returnType = returnType.nested(); 144 if (returnValue == null) { 145 mavContainer.setRequestHandled(true); 146 outputMessage.flush(); 147 return; 148 } 149 } 150 151 ServletRequest request = webRequest.getNativeRequest(ServletRequest.class); 152 Assert.state(request != null, "No ServletRequest"); 153 154 ResponseBodyEmitter emitter; 155 if (returnValue instanceof ResponseBodyEmitter) { 156 emitter = (ResponseBodyEmitter) returnValue; 157 } 158 else { 159 emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest); 160 if (emitter == null) { 161 // Not streaming: write headers without committing response.. 162 outputMessage.getHeaders().forEach((headerName, headerValues) -> { 163 for (String headerValue : headerValues) { 164 response.addHeader(headerName, headerValue); 165 } 166 }); 167 return; 168 } 169 } 170 emitter.extendResponse(outputMessage); 171 172 // At this point we know we're streaming.. 173 ShallowEtagHeaderFilter.disableContentCaching(request); 174 175 // Wrap the response to ignore further header changes 176 // Headers will be flushed at the first write 177 outputMessage = new StreamingServletServerHttpResponse(outputMessage); 178 179 HttpMessageConvertingHandler handler; 180 try { 181 DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout()); 182 WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); 183 handler = new HttpMessageConvertingHandler(outputMessage, deferredResult); 184 } 185 catch (Throwable ex) { 186 emitter.initializeWithError(ex); 187 throw ex; 188 } 189 190 emitter.initialize(handler); 191 } 192 193 194 /** 195 * ResponseBodyEmitter.Handler that writes with HttpMessageConverter's. 196 */ 197 private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler { 198 199 private final ServerHttpResponse outputMessage; 200 201 private final DeferredResult<?> deferredResult; 202 203 public HttpMessageConvertingHandler(ServerHttpResponse outputMessage, DeferredResult<?> deferredResult) { 204 this.outputMessage = outputMessage; 205 this.deferredResult = deferredResult; 206 } 207 208 @Override 209 public void send(Object data, @Nullable MediaType mediaType) throws IOException { 210 sendInternal(data, mediaType); 211 } 212 213 @SuppressWarnings("unchecked") 214 private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException { 215 for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) { 216 if (converter.canWrite(data.getClass(), mediaType)) { 217 ((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage); 218 this.outputMessage.flush(); 219 return; 220 } 221 } 222 throw new IllegalArgumentException("No suitable converter for " + data.getClass()); 223 } 224 225 @Override 226 public void complete() { 227 try { 228 this.outputMessage.flush(); 229 this.deferredResult.setResult(null); 230 } 231 catch (IOException ex) { 232 this.deferredResult.setErrorResult(ex); 233 } 234 } 235 236 @Override 237 public void completeWithError(Throwable failure) { 238 this.deferredResult.setErrorResult(failure); 239 } 240 241 @Override 242 public void onTimeout(Runnable callback) { 243 this.deferredResult.onTimeout(callback); 244 } 245 246 @Override 247 public void onError(Consumer<Throwable> callback) { 248 this.deferredResult.onError(callback); 249 } 250 251 @Override 252 public void onCompletion(Runnable callback) { 253 this.deferredResult.onCompletion(callback); 254 } 255 } 256 257 258 /** 259 * Wrap to silently ignore header changes HttpMessageConverter's that would 260 * otherwise cause HttpHeaders to raise exceptions. 261 */ 262 private static class StreamingServletServerHttpResponse implements ServerHttpResponse { 263 264 private final ServerHttpResponse delegate; 265 266 private final HttpHeaders mutableHeaders = new HttpHeaders(); 267 268 public StreamingServletServerHttpResponse(ServerHttpResponse delegate) { 269 this.delegate = delegate; 270 this.mutableHeaders.putAll(delegate.getHeaders()); 271 } 272 273 @Override 274 public void setStatusCode(HttpStatus status) { 275 this.delegate.setStatusCode(status); 276 } 277 278 @Override 279 public HttpHeaders getHeaders() { 280 return this.mutableHeaders; 281 } 282 283 @Override 284 public OutputStream getBody() throws IOException { 285 return this.delegate.getBody(); 286 } 287 288 @Override 289 public void flush() throws IOException { 290 this.delegate.flush(); 291 } 292 293 @Override 294 public void close() { 295 this.delegate.close(); 296 } 297 } 298 299}