001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.servlet.mvc.method.annotation;
018
019import java.io.IOException;
020import java.util.LinkedHashSet;
021import java.util.Set;
022import java.util.function.Consumer;
023
024import org.springframework.http.MediaType;
025import org.springframework.http.server.ServerHttpResponse;
026import org.springframework.lang.Nullable;
027import org.springframework.util.Assert;
028import org.springframework.util.ObjectUtils;
029
030/**
031 * A controller method return value type for asynchronous request processing
032 * where one or more objects are written to the response.
033 *
034 * <p>While {@link org.springframework.web.context.request.async.DeferredResult}
035 * is used to produce a single result, a {@code ResponseBodyEmitter} can be used
036 * to send multiple objects where each object is written with a compatible
037 * {@link org.springframework.http.converter.HttpMessageConverter}.
038 *
039 * <p>Supported as a return type on its own as well as within a
040 * {@link org.springframework.http.ResponseEntity}.
041 *
042 * <pre>
043 * &#064;RequestMapping(value="/stream", method=RequestMethod.GET)
044 * public ResponseBodyEmitter handle() {
045 *         ResponseBodyEmitter emitter = new ResponseBodyEmitter();
046 *         // Pass the emitter to another component...
047 *         return emitter;
048 * }
049 *
050 * // in another thread
051 * emitter.send(foo1);
052 *
053 * // and again
054 * emitter.send(foo2);
055 *
056 * // and done
057 * emitter.complete();
058 * </pre>
059 *
060 * @author Rossen Stoyanchev
061 * @author Juergen Hoeller
062 * @since 4.2
063 */
064public class ResponseBodyEmitter {
065
066        @Nullable
067        private final Long timeout;
068
069        @Nullable
070        private Handler handler;
071
072        /** Store send data before handler is initialized. */
073        private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8);
074
075        /** Store successful completion before the handler is initialized. */
076        private boolean complete;
077
078        /** Store an error before the handler is initialized. */
079        @Nullable
080        private Throwable failure;
081
082        /**
083         * After an I/O error, we don't call {@link #completeWithError} directly but
084         * wait for the Servlet container to call us via {@code AsyncListener#onError}
085         * on a container thread at which point we call completeWithError.
086         * This flag is used to ignore further calls to complete or completeWithError
087         * that may come for example from an application try-catch block on the
088         * thread of the I/O error.
089         */
090        private boolean sendFailed;
091
092        private final DefaultCallback timeoutCallback = new DefaultCallback();
093
094        private final ErrorCallback errorCallback = new ErrorCallback();
095
096        private final DefaultCallback completionCallback = new DefaultCallback();
097
098
099        /**
100         * Create a new ResponseBodyEmitter instance.
101         */
102        public ResponseBodyEmitter() {
103                this.timeout = null;
104        }
105
106        /**
107         * Create a ResponseBodyEmitter with a custom timeout value.
108         * <p>By default not set in which case the default configured in the MVC
109         * Java Config or the MVC namespace is used, or if that's not set, then the
110         * timeout depends on the default of the underlying server.
111         * @param timeout the timeout value in milliseconds
112         */
113        public ResponseBodyEmitter(Long timeout) {
114                this.timeout = timeout;
115        }
116
117
118        /**
119         * Return the configured timeout value, if any.
120         */
121        @Nullable
122        public Long getTimeout() {
123                return this.timeout;
124        }
125
126
127        synchronized void initialize(Handler handler) throws IOException {
128                this.handler = handler;
129
130                try {
131                        for (DataWithMediaType sendAttempt : this.earlySendAttempts) {
132                                sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
133                        }
134                }
135                finally {
136                        this.earlySendAttempts.clear();
137                }
138
139                if (this.complete) {
140                        if (this.failure != null) {
141                                this.handler.completeWithError(this.failure);
142                        }
143                        else {
144                                this.handler.complete();
145                        }
146                }
147                else {
148                        this.handler.onTimeout(this.timeoutCallback);
149                        this.handler.onError(this.errorCallback);
150                        this.handler.onCompletion(this.completionCallback);
151                }
152        }
153
154        synchronized void initializeWithError(Throwable ex) {
155                this.complete = true;
156                this.failure = ex;
157                this.earlySendAttempts.clear();
158                this.errorCallback.accept(ex);
159        }
160
161        /**
162         * Invoked after the response is updated with the status code and headers,
163         * if the ResponseBodyEmitter is wrapped in a ResponseEntity, but before the
164         * response is committed, i.e. before the response body has been written to.
165         * <p>The default implementation is empty.
166         */
167        protected void extendResponse(ServerHttpResponse outputMessage) {
168        }
169
170        /**
171         * Write the given object to the response.
172         * <p>If any exception occurs a dispatch is made back to the app server where
173         * Spring MVC will pass the exception through its exception handling mechanism.
174         * <p><strong>Note:</strong> if the send fails with an IOException, you do
175         * not need to call {@link #completeWithError(Throwable)} in order to clean
176         * up. Instead the Servlet container creates a notification that results in a
177         * dispatch where Spring MVC invokes exception resolvers and completes
178         * processing.
179         * @param object the object to write
180         * @throws IOException raised when an I/O error occurs
181         * @throws java.lang.IllegalStateException wraps any other errors
182         */
183        public void send(Object object) throws IOException {
184                send(object, null);
185        }
186
187        /**
188         * Overloaded variant of {@link #send(Object)} that also accepts a MediaType
189         * hint for how to serialize the given Object.
190         * @param object the object to write
191         * @param mediaType a MediaType hint for selecting an HttpMessageConverter
192         * @throws IOException raised when an I/O error occurs
193         * @throws java.lang.IllegalStateException wraps any other errors
194         */
195        public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
196                Assert.state(!this.complete,
197                                "ResponseBodyEmitter has already completed" +
198                                                (this.failure != null ? " with error: " + this.failure : ""));
199                sendInternal(object, mediaType);
200        }
201
202        private void sendInternal(Object object, @Nullable MediaType mediaType) throws IOException {
203                if (this.handler != null) {
204                        try {
205                                this.handler.send(object, mediaType);
206                        }
207                        catch (IOException ex) {
208                                this.sendFailed = true;
209                                throw ex;
210                        }
211                        catch (Throwable ex) {
212                                this.sendFailed = true;
213                                throw new IllegalStateException("Failed to send " + object, ex);
214                        }
215                }
216                else {
217                        this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
218                }
219        }
220
221        /**
222         * Complete request processing by performing a dispatch into the servlet
223         * container, where Spring MVC is invoked once more, and completes the
224         * request processing lifecycle.
225         * <p><strong>Note:</strong> this method should be called by the application
226         * to complete request processing. It should not be used after container
227         * related events such as an error while {@link #send(Object) sending}.
228         */
229        public synchronized void complete() {
230                // Ignore, after send failure
231                if (this.sendFailed) {
232                        return;
233                }
234                this.complete = true;
235                if (this.handler != null) {
236                        this.handler.complete();
237                }
238        }
239
240        /**
241         * Complete request processing with an error.
242         * <p>A dispatch is made into the app server where Spring MVC will pass the
243         * exception through its exception handling mechanism. Note however that
244         * at this stage of request processing, the response is committed and the
245         * response status can no longer be changed.
246         * <p><strong>Note:</strong> this method should be called by the application
247         * to complete request processing with an error. It should not be used after
248         * container related events such as an error while
249         * {@link #send(Object) sending}.
250         */
251        public synchronized void completeWithError(Throwable ex) {
252                // Ignore, after send failure
253                if (this.sendFailed) {
254                        return;
255                }
256                this.complete = true;
257                this.failure = ex;
258                if (this.handler != null) {
259                        this.handler.completeWithError(ex);
260                }
261        }
262
263        /**
264         * Register code to invoke when the async request times out. This method is
265         * called from a container thread when an async request times out.
266         */
267        public synchronized void onTimeout(Runnable callback) {
268                this.timeoutCallback.setDelegate(callback);
269        }
270
271        /**
272         * Register code to invoke for an error during async request processing.
273         * This method is called from a container thread when an error occurred
274         * while processing an async request.
275         * @since 5.0
276         */
277        public synchronized void onError(Consumer<Throwable> callback) {
278                this.errorCallback.setDelegate(callback);
279        }
280
281        /**
282         * Register code to invoke when the async request completes. This method is
283         * called from a container thread when an async request completed for any
284         * reason including timeout and network error. This method is useful for
285         * detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
286         */
287        public synchronized void onCompletion(Runnable callback) {
288                this.completionCallback.setDelegate(callback);
289        }
290
291
292        @Override
293        public String toString() {
294                return "ResponseBodyEmitter@" + ObjectUtils.getIdentityHexString(this);
295        }
296
297
298        /**
299         * Contract to handle the sending of event data, the completion of event
300         * sending, and the registration of callbacks to be invoked in case of
301         * timeout, error, and completion for any reason (including from the
302         * container side).
303         */
304        interface Handler {
305
306                void send(Object data, @Nullable MediaType mediaType) throws IOException;
307
308                void complete();
309
310                void completeWithError(Throwable failure);
311
312                void onTimeout(Runnable callback);
313
314                void onError(Consumer<Throwable> callback);
315
316                void onCompletion(Runnable callback);
317        }
318
319
320        /**
321         * A simple holder of data to be written along with a MediaType hint for
322         * selecting a message converter to write with.
323         */
324        public static class DataWithMediaType {
325
326                private final Object data;
327
328                @Nullable
329                private final MediaType mediaType;
330
331                public DataWithMediaType(Object data, @Nullable MediaType mediaType) {
332                        this.data = data;
333                        this.mediaType = mediaType;
334                }
335
336                public Object getData() {
337                        return this.data;
338                }
339
340                @Nullable
341                public MediaType getMediaType() {
342                        return this.mediaType;
343                }
344        }
345
346
347        private class DefaultCallback implements Runnable {
348
349                @Nullable
350                private Runnable delegate;
351
352                public void setDelegate(Runnable delegate) {
353                        this.delegate = delegate;
354                }
355
356                @Override
357                public void run() {
358                        ResponseBodyEmitter.this.complete = true;
359                        if (this.delegate != null) {
360                                this.delegate.run();
361                        }
362                }
363        }
364
365
366        private class ErrorCallback implements Consumer<Throwable> {
367
368                @Nullable
369                private Consumer<Throwable> delegate;
370
371                public void setDelegate(Consumer<Throwable> callback) {
372                        this.delegate = callback;
373                }
374
375                @Override
376                public void accept(Throwable t) {
377                        ResponseBodyEmitter.this.complete = true;
378                        if (this.delegate != null) {
379                                this.delegate.accept(t);
380                        }
381                }
382        }
383
384}