001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.server.reactive;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicReference;
022import java.util.function.Supplier;
023
024import org.apache.commons.logging.Log;
025import org.reactivestreams.Publisher;
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028
029import org.springframework.core.io.buffer.DataBuffer;
030import org.springframework.core.io.buffer.DataBufferFactory;
031import org.springframework.core.io.buffer.DataBufferUtils;
032import org.springframework.core.io.buffer.PooledDataBuffer;
033import org.springframework.http.HttpHeaders;
034import org.springframework.http.HttpLogging;
035import org.springframework.http.HttpStatus;
036import org.springframework.http.ResponseCookie;
037import org.springframework.lang.Nullable;
038import org.springframework.util.Assert;
039import org.springframework.util.CollectionUtils;
040import org.springframework.util.LinkedMultiValueMap;
041import org.springframework.util.MultiValueMap;
042
043/**
044 * Base class for {@link ServerHttpResponse} implementations.
045 *
046 * @author Rossen Stoyanchev
047 * @author Juergen Hoeller
048 * @author Sebastien Deleuze
049 * @author Brian Clozel
050 * @since 5.0
051 */
052public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
053
054        /**
055         * COMMITTING -> COMMITTED is the period after doCommit is called but before
056         * the response status and headers have been applied to the underlying
057         * response during which time pre-commit actions can still make changes to
058         * the response status and headers.
059         */
060        private enum State {NEW, COMMITTING, COMMITTED}
061
062        protected final Log logger = HttpLogging.forLogName(getClass());
063
064
065        private final DataBufferFactory dataBufferFactory;
066
067        @Nullable
068        private Integer statusCode;
069
070        private final HttpHeaders headers;
071
072        private final MultiValueMap<String, ResponseCookie> cookies;
073
074        private final AtomicReference<State> state = new AtomicReference<>(State.NEW);
075
076        private final List<Supplier<? extends Mono<Void>>> commitActions = new ArrayList<>(4);
077
078
079        public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory) {
080                this(dataBufferFactory, new HttpHeaders());
081        }
082
083        public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
084                Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
085                Assert.notNull(headers, "HttpHeaders must not be null");
086                this.dataBufferFactory = dataBufferFactory;
087                this.headers = headers;
088                this.cookies = new LinkedMultiValueMap<>();
089        }
090
091
092        @Override
093        public final DataBufferFactory bufferFactory() {
094                return this.dataBufferFactory;
095        }
096
097        @Override
098        public boolean setStatusCode(@Nullable HttpStatus status) {
099                if (this.state.get() == State.COMMITTED) {
100                        return false;
101                }
102                else {
103                        this.statusCode = (status != null ? status.value() : null);
104                        return true;
105                }
106        }
107
108        @Override
109        @Nullable
110        public HttpStatus getStatusCode() {
111                return (this.statusCode != null ? HttpStatus.resolve(this.statusCode) : null);
112        }
113
114        @Override
115        public boolean setRawStatusCode(@Nullable Integer statusCode) {
116                if (this.state.get() == State.COMMITTED) {
117                        return false;
118                }
119                else {
120                        this.statusCode = statusCode;
121                        return true;
122                }
123        }
124
125        @Override
126        @Nullable
127        public Integer getRawStatusCode() {
128                return this.statusCode;
129        }
130
131        /**
132         * Set the HTTP status code of the response.
133         * @param statusCode the HTTP status as an integer value
134         * @since 5.0.1
135         * @deprecated as of 5.2.4 in favor of {@link ServerHttpResponse#setRawStatusCode(Integer)}.
136         */
137        @Deprecated
138        public void setStatusCodeValue(@Nullable Integer statusCode) {
139                if (this.state.get() != State.COMMITTED) {
140                        this.statusCode = statusCode;
141                }
142        }
143
144        /**
145         * Return the HTTP status code of the response.
146         * @return the HTTP status as an integer value
147         * @since 5.0.1
148         * @deprecated as of 5.2.4 in favor of {@link ServerHttpResponse#getRawStatusCode()}.
149         */
150        @Nullable
151        @Deprecated
152        public Integer getStatusCodeValue() {
153                return this.statusCode;
154        }
155
156        @Override
157        public HttpHeaders getHeaders() {
158                return (this.state.get() == State.COMMITTED ?
159                                HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
160        }
161
162        @Override
163        public MultiValueMap<String, ResponseCookie> getCookies() {
164                return (this.state.get() == State.COMMITTED ?
165                                CollectionUtils.unmodifiableMultiValueMap(this.cookies) : this.cookies);
166        }
167
168        @Override
169        public void addCookie(ResponseCookie cookie) {
170                Assert.notNull(cookie, "ResponseCookie must not be null");
171
172                if (this.state.get() == State.COMMITTED) {
173                        throw new IllegalStateException("Can't add the cookie " + cookie +
174                                        "because the HTTP response has already been committed");
175                }
176                else {
177                        getCookies().add(cookie.getName(), cookie);
178                }
179        }
180
181        /**
182         * Return the underlying server response.
183         * <p><strong>Note:</strong> This is exposed mainly for internal framework
184         * use such as WebSocket upgrades in the spring-webflux module.
185         */
186        public abstract <T> T getNativeResponse();
187
188
189        @Override
190        public void beforeCommit(Supplier<? extends Mono<Void>> action) {
191                this.commitActions.add(action);
192        }
193
194        @Override
195        public boolean isCommitted() {
196                return this.state.get() != State.NEW;
197        }
198
199        @Override
200        @SuppressWarnings("unchecked")
201        public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
202                // For Mono we can avoid ChannelSendOperator and Reactor Netty is more optimized for Mono.
203                // We must resolve value first however, for a chance to handle potential error.
204                if (body instanceof Mono) {
205                        return ((Mono<? extends DataBuffer>) body)
206                                        .flatMap(buffer -> {
207                                                AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
208                                                return doCommit(
209                                                                () -> {
210                                                                        try {
211                                                                                return writeWithInternal(Mono.fromCallable(() -> buffer)
212                                                                                                .doOnSubscribe(s -> subscribed.set(true))
213                                                                                                .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
214                                                                        }
215                                                                        catch (Throwable ex) {
216                                                                                return Mono.error(ex);
217                                                                        }
218                                                                })
219                                                                .doOnError(ex -> DataBufferUtils.release(buffer))
220                                                                .doOnCancel(() -> {
221                                                                        if (!subscribed.get()) {
222                                                                                DataBufferUtils.release(buffer);
223                                                                        }
224                                                                });
225                                        })
226                                        .doOnError(t -> getHeaders().clearContentHeaders())
227                                        .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
228                }
229                else {
230                        return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
231                                        .doOnError(t -> getHeaders().clearContentHeaders());
232                }
233        }
234
235        @Override
236        public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
237                return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeAndFlushWithInternal(inner)))
238                                .doOnError(t -> getHeaders().clearContentHeaders());
239        }
240
241        @Override
242        public Mono<Void> setComplete() {
243                return !isCommitted() ? doCommit(null) : Mono.empty();
244        }
245
246        /**
247         * A variant of {@link #doCommit(Supplier)} for a response without no body.
248         * @return a completion publisher
249         */
250        protected Mono<Void> doCommit() {
251                return doCommit(null);
252        }
253
254        /**
255         * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the
256         * response status and headers/cookies, and write the response body.
257         * @param writeAction the action to write the response body (may be {@code null})
258         * @return a completion publisher
259         */
260        protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) {
261                if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
262                        return Mono.empty();
263                }
264
265                Flux<Void> allActions = Flux.empty();
266
267                if (!this.commitActions.isEmpty()) {
268                        allActions = Flux.concat(Flux.fromIterable(this.commitActions).map(Supplier::get))
269                                        .doOnError(ex -> {
270                                                if (this.state.compareAndSet(State.COMMITTING, State.NEW)) {
271                                                        getHeaders().clearContentHeaders();
272                                                }
273                                        });
274                }
275
276                allActions = allActions.concatWith(Mono.fromRunnable(() -> {
277                        applyStatusCode();
278                        applyHeaders();
279                        applyCookies();
280                        this.state.set(State.COMMITTED);
281                }));
282
283                if (writeAction != null) {
284                        allActions = allActions.concatWith(writeAction.get());
285                }
286
287                return allActions.then();
288        }
289
290
291        /**
292         * Write to the underlying the response.
293         * @param body the publisher to write with
294         */
295        protected abstract Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body);
296
297        /**
298         * Write to the underlying the response, and flush after each {@code Publisher<DataBuffer>}.
299         * @param body the publisher to write and flush with
300         */
301        protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body);
302
303        /**
304         * Write the status code to the underlying response.
305         * This method is called once only.
306         */
307        protected abstract void applyStatusCode();
308
309        /**
310         * Invoked when the response is getting committed allowing sub-classes to
311         * make apply header values to the underlying response.
312         * <p>Note that most sub-classes use an {@link HttpHeaders} instance that
313         * wraps an adapter to the native response headers such that changes are
314         * propagated to the underlying response on the go. That means this callback
315         * is typically not used other than for specialized updates such as setting
316         * the contentType or characterEncoding fields in a Servlet response.
317         */
318        protected abstract void applyHeaders();
319
320        /**
321         * Add cookies from {@link #getHeaders()} to the underlying response.
322         * This method is called once only.
323         */
324        protected abstract void applyCookies();
325
326}