001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.servlet.mvc.method.annotation;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.nio.charset.StandardCharsets;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.function.Consumer;
025
026import javax.servlet.ServletRequest;
027import javax.servlet.http.HttpServletResponse;
028
029import org.springframework.core.MethodParameter;
030import org.springframework.core.ReactiveAdapterRegistry;
031import org.springframework.core.ResolvableType;
032import org.springframework.core.task.TaskExecutor;
033import org.springframework.http.HttpHeaders;
034import org.springframework.http.HttpStatus;
035import org.springframework.http.MediaType;
036import org.springframework.http.ResponseEntity;
037import org.springframework.http.converter.HttpMessageConverter;
038import org.springframework.http.converter.StringHttpMessageConverter;
039import org.springframework.http.server.ServerHttpResponse;
040import org.springframework.http.server.ServletServerHttpResponse;
041import org.springframework.lang.Nullable;
042import org.springframework.util.Assert;
043import org.springframework.web.accept.ContentNegotiationManager;
044import org.springframework.web.context.request.NativeWebRequest;
045import org.springframework.web.context.request.async.DeferredResult;
046import org.springframework.web.context.request.async.WebAsyncUtils;
047import org.springframework.web.filter.ShallowEtagHeaderFilter;
048import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
049import org.springframework.web.method.support.ModelAndViewContainer;
050
051/**
052 * Handler for return values of type {@link ResponseBodyEmitter} and sub-classes
053 * such as {@link SseEmitter} including the same types wrapped with
054 * {@link ResponseEntity}.
055 *
056 * <p>As of 5.0 also supports reactive return value types for any reactive
057 * library with registered adapters in {@link ReactiveAdapterRegistry}.
058 *
059 * @author Rossen Stoyanchev
060 * @since 4.2
061 */
062public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
063
064        private final List<HttpMessageConverter<?>> messageConverters;
065
066        private final List<HttpMessageConverter<?>> sseMessageConverters;
067
068        private final ReactiveTypeHandler reactiveHandler;
069
070
071        /**
072         * Simple constructor with reactive type support based on a default instance of
073         * {@link ReactiveAdapterRegistry},
074         * {@link org.springframework.core.task.SyncTaskExecutor}, and
075         * {@link ContentNegotiationManager} with an Accept header strategy.
076         */
077        public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
078                Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
079                this.messageConverters = messageConverters;
080                this.sseMessageConverters = initSseConverters(messageConverters);
081                this.reactiveHandler = new ReactiveTypeHandler();
082        }
083
084        /**
085         * Complete constructor with pluggable "reactive" type support.
086         * @param messageConverters converters to write emitted objects with
087         * @param registry for reactive return value type support
088         * @param executor for blocking I/O writes of items emitted from reactive types
089         * @param manager for detecting streaming media types
090         * @since 5.0
091         */
092        public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters,
093                        ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
094
095                Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
096                this.messageConverters = messageConverters;
097                this.sseMessageConverters = initSseConverters(messageConverters);
098                this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager);
099        }
100
101        private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
102                for (HttpMessageConverter<?> converter : converters) {
103                        if (converter.canWrite(String.class, MediaType.TEXT_PLAIN)) {
104                                return converters;
105                        }
106                }
107                List<HttpMessageConverter<?>> result = new ArrayList<>(converters.size() + 1);
108                result.add(new StringHttpMessageConverter(StandardCharsets.UTF_8));
109                result.addAll(converters);
110                return result;
111        }
112
113
114        @Override
115        public boolean supportsReturnType(MethodParameter returnType) {
116                Class<?> bodyType = ResponseEntity.class.isAssignableFrom(returnType.getParameterType()) ?
117                                ResolvableType.forMethodParameter(returnType).getGeneric().resolve() :
118                                returnType.getParameterType();
119
120                return (bodyType != null && (ResponseBodyEmitter.class.isAssignableFrom(bodyType) ||
121                                this.reactiveHandler.isReactiveType(bodyType)));
122        }
123
124        @Override
125        @SuppressWarnings("resource")
126        public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
127                        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
128
129                if (returnValue == null) {
130                        mavContainer.setRequestHandled(true);
131                        return;
132                }
133
134                HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
135                Assert.state(response != null, "No HttpServletResponse");
136                ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
137
138                if (returnValue instanceof ResponseEntity) {
139                        ResponseEntity<?> responseEntity = (ResponseEntity<?>) returnValue;
140                        response.setStatus(responseEntity.getStatusCodeValue());
141                        outputMessage.getHeaders().putAll(responseEntity.getHeaders());
142                        returnValue = responseEntity.getBody();
143                        returnType = returnType.nested();
144                        if (returnValue == null) {
145                                mavContainer.setRequestHandled(true);
146                                outputMessage.flush();
147                                return;
148                        }
149                }
150
151                ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
152                Assert.state(request != null, "No ServletRequest");
153
154                ResponseBodyEmitter emitter;
155                if (returnValue instanceof ResponseBodyEmitter) {
156                        emitter = (ResponseBodyEmitter) returnValue;
157                }
158                else {
159                        emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
160                        if (emitter == null) {
161                                // Not streaming: write headers without committing response..
162                                outputMessage.getHeaders().forEach((headerName, headerValues) -> {
163                                        for (String headerValue : headerValues) {
164                                                response.addHeader(headerName, headerValue);
165                                        }
166                                });
167                                return;
168                        }
169                }
170                emitter.extendResponse(outputMessage);
171
172                // At this point we know we're streaming..
173                ShallowEtagHeaderFilter.disableContentCaching(request);
174
175                // Wrap the response to ignore further header changes
176                // Headers will be flushed at the first write
177                outputMessage = new StreamingServletServerHttpResponse(outputMessage);
178
179                HttpMessageConvertingHandler handler;
180                try {
181                        DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
182                        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
183                        handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
184                }
185                catch (Throwable ex) {
186                        emitter.initializeWithError(ex);
187                        throw ex;
188                }
189
190                emitter.initialize(handler);
191        }
192
193
194        /**
195         * ResponseBodyEmitter.Handler that writes with HttpMessageConverter's.
196         */
197        private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {
198
199                private final ServerHttpResponse outputMessage;
200
201                private final DeferredResult<?> deferredResult;
202
203                public HttpMessageConvertingHandler(ServerHttpResponse outputMessage, DeferredResult<?> deferredResult) {
204                        this.outputMessage = outputMessage;
205                        this.deferredResult = deferredResult;
206                }
207
208                @Override
209                public void send(Object data, @Nullable MediaType mediaType) throws IOException {
210                        sendInternal(data, mediaType);
211                }
212
213                @SuppressWarnings("unchecked")
214                private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
215                        for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
216                                if (converter.canWrite(data.getClass(), mediaType)) {
217                                        ((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
218                                        this.outputMessage.flush();
219                                        return;
220                                }
221                        }
222                        throw new IllegalArgumentException("No suitable converter for " + data.getClass());
223                }
224
225                @Override
226                public void complete() {
227                        try {
228                                this.outputMessage.flush();
229                                this.deferredResult.setResult(null);
230                        }
231                        catch (IOException ex) {
232                                this.deferredResult.setErrorResult(ex);
233                        }
234                }
235
236                @Override
237                public void completeWithError(Throwable failure) {
238                        this.deferredResult.setErrorResult(failure);
239                }
240
241                @Override
242                public void onTimeout(Runnable callback) {
243                        this.deferredResult.onTimeout(callback);
244                }
245
246                @Override
247                public void onError(Consumer<Throwable> callback) {
248                        this.deferredResult.onError(callback);
249                }
250
251                @Override
252                public void onCompletion(Runnable callback) {
253                        this.deferredResult.onCompletion(callback);
254                }
255        }
256
257
258        /**
259         * Wrap to silently ignore header changes HttpMessageConverter's that would
260         * otherwise cause HttpHeaders to raise exceptions.
261         */
262        private static class StreamingServletServerHttpResponse implements ServerHttpResponse {
263
264                private final ServerHttpResponse delegate;
265
266                private final HttpHeaders mutableHeaders = new HttpHeaders();
267
268                public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
269                        this.delegate = delegate;
270                        this.mutableHeaders.putAll(delegate.getHeaders());
271                }
272
273                @Override
274                public void setStatusCode(HttpStatus status) {
275                        this.delegate.setStatusCode(status);
276                }
277
278                @Override
279                public HttpHeaders getHeaders() {
280                        return this.mutableHeaders;
281                }
282
283                @Override
284                public OutputStream getBody() throws IOException {
285                        return this.delegate.getBody();
286                }
287
288                @Override
289                public void flush() throws IOException {
290                        this.delegate.flush();
291                }
292
293                @Override
294                public void close() {
295                        this.delegate.close();
296                }
297        }
298
299}