001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.server.reactive; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicReference; 022import java.util.function.Supplier; 023 024import org.apache.commons.logging.Log; 025import org.reactivestreams.Publisher; 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028 029import org.springframework.core.io.buffer.DataBuffer; 030import org.springframework.core.io.buffer.DataBufferFactory; 031import org.springframework.core.io.buffer.DataBufferUtils; 032import org.springframework.core.io.buffer.PooledDataBuffer; 033import org.springframework.http.HttpHeaders; 034import org.springframework.http.HttpLogging; 035import org.springframework.http.HttpStatus; 036import org.springframework.http.ResponseCookie; 037import org.springframework.lang.Nullable; 038import org.springframework.util.Assert; 039import org.springframework.util.CollectionUtils; 040import org.springframework.util.LinkedMultiValueMap; 041import org.springframework.util.MultiValueMap; 042 043/** 044 * Base class for {@link ServerHttpResponse} implementations. 045 * 046 * @author Rossen Stoyanchev 047 * @author Juergen Hoeller 048 * @author Sebastien Deleuze 049 * @author Brian Clozel 050 * @since 5.0 051 */ 052public abstract class AbstractServerHttpResponse implements ServerHttpResponse { 053 054 /** 055 * COMMITTING -> COMMITTED is the period after doCommit is called but before 056 * the response status and headers have been applied to the underlying 057 * response during which time pre-commit actions can still make changes to 058 * the response status and headers. 059 */ 060 private enum State {NEW, COMMITTING, COMMITTED} 061 062 protected final Log logger = HttpLogging.forLogName(getClass()); 063 064 065 private final DataBufferFactory dataBufferFactory; 066 067 @Nullable 068 private Integer statusCode; 069 070 private final HttpHeaders headers; 071 072 private final MultiValueMap<String, ResponseCookie> cookies; 073 074 private final AtomicReference<State> state = new AtomicReference<>(State.NEW); 075 076 private final List<Supplier<? extends Mono<Void>>> commitActions = new ArrayList<>(4); 077 078 079 public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory) { 080 this(dataBufferFactory, new HttpHeaders()); 081 } 082 083 public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) { 084 Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); 085 Assert.notNull(headers, "HttpHeaders must not be null"); 086 this.dataBufferFactory = dataBufferFactory; 087 this.headers = headers; 088 this.cookies = new LinkedMultiValueMap<>(); 089 } 090 091 092 @Override 093 public final DataBufferFactory bufferFactory() { 094 return this.dataBufferFactory; 095 } 096 097 @Override 098 public boolean setStatusCode(@Nullable HttpStatus status) { 099 if (this.state.get() == State.COMMITTED) { 100 return false; 101 } 102 else { 103 this.statusCode = (status != null ? status.value() : null); 104 return true; 105 } 106 } 107 108 @Override 109 @Nullable 110 public HttpStatus getStatusCode() { 111 return (this.statusCode != null ? HttpStatus.resolve(this.statusCode) : null); 112 } 113 114 @Override 115 public boolean setRawStatusCode(@Nullable Integer statusCode) { 116 if (this.state.get() == State.COMMITTED) { 117 return false; 118 } 119 else { 120 this.statusCode = statusCode; 121 return true; 122 } 123 } 124 125 @Override 126 @Nullable 127 public Integer getRawStatusCode() { 128 return this.statusCode; 129 } 130 131 /** 132 * Set the HTTP status code of the response. 133 * @param statusCode the HTTP status as an integer value 134 * @since 5.0.1 135 * @deprecated as of 5.2.4 in favor of {@link ServerHttpResponse#setRawStatusCode(Integer)}. 136 */ 137 @Deprecated 138 public void setStatusCodeValue(@Nullable Integer statusCode) { 139 if (this.state.get() != State.COMMITTED) { 140 this.statusCode = statusCode; 141 } 142 } 143 144 /** 145 * Return the HTTP status code of the response. 146 * @return the HTTP status as an integer value 147 * @since 5.0.1 148 * @deprecated as of 5.2.4 in favor of {@link ServerHttpResponse#getRawStatusCode()}. 149 */ 150 @Nullable 151 @Deprecated 152 public Integer getStatusCodeValue() { 153 return this.statusCode; 154 } 155 156 @Override 157 public HttpHeaders getHeaders() { 158 return (this.state.get() == State.COMMITTED ? 159 HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); 160 } 161 162 @Override 163 public MultiValueMap<String, ResponseCookie> getCookies() { 164 return (this.state.get() == State.COMMITTED ? 165 CollectionUtils.unmodifiableMultiValueMap(this.cookies) : this.cookies); 166 } 167 168 @Override 169 public void addCookie(ResponseCookie cookie) { 170 Assert.notNull(cookie, "ResponseCookie must not be null"); 171 172 if (this.state.get() == State.COMMITTED) { 173 throw new IllegalStateException("Can't add the cookie " + cookie + 174 "because the HTTP response has already been committed"); 175 } 176 else { 177 getCookies().add(cookie.getName(), cookie); 178 } 179 } 180 181 /** 182 * Return the underlying server response. 183 * <p><strong>Note:</strong> This is exposed mainly for internal framework 184 * use such as WebSocket upgrades in the spring-webflux module. 185 */ 186 public abstract <T> T getNativeResponse(); 187 188 189 @Override 190 public void beforeCommit(Supplier<? extends Mono<Void>> action) { 191 this.commitActions.add(action); 192 } 193 194 @Override 195 public boolean isCommitted() { 196 return this.state.get() != State.NEW; 197 } 198 199 @Override 200 @SuppressWarnings("unchecked") 201 public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { 202 // For Mono we can avoid ChannelSendOperator and Reactor Netty is more optimized for Mono. 203 // We must resolve value first however, for a chance to handle potential error. 204 if (body instanceof Mono) { 205 return ((Mono<? extends DataBuffer>) body) 206 .flatMap(buffer -> { 207 AtomicReference<Boolean> subscribed = new AtomicReference<>(false); 208 return doCommit( 209 () -> { 210 try { 211 return writeWithInternal(Mono.fromCallable(() -> buffer) 212 .doOnSubscribe(s -> subscribed.set(true)) 213 .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); 214 } 215 catch (Throwable ex) { 216 return Mono.error(ex); 217 } 218 }) 219 .doOnError(ex -> DataBufferUtils.release(buffer)) 220 .doOnCancel(() -> { 221 if (!subscribed.get()) { 222 DataBufferUtils.release(buffer); 223 } 224 }); 225 }) 226 .doOnError(t -> getHeaders().clearContentHeaders()) 227 .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); 228 } 229 else { 230 return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) 231 .doOnError(t -> getHeaders().clearContentHeaders()); 232 } 233 } 234 235 @Override 236 public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { 237 return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeAndFlushWithInternal(inner))) 238 .doOnError(t -> getHeaders().clearContentHeaders()); 239 } 240 241 @Override 242 public Mono<Void> setComplete() { 243 return !isCommitted() ? doCommit(null) : Mono.empty(); 244 } 245 246 /** 247 * A variant of {@link #doCommit(Supplier)} for a response without no body. 248 * @return a completion publisher 249 */ 250 protected Mono<Void> doCommit() { 251 return doCommit(null); 252 } 253 254 /** 255 * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the 256 * response status and headers/cookies, and write the response body. 257 * @param writeAction the action to write the response body (may be {@code null}) 258 * @return a completion publisher 259 */ 260 protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) { 261 if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) { 262 return Mono.empty(); 263 } 264 265 Flux<Void> allActions = Flux.empty(); 266 267 if (!this.commitActions.isEmpty()) { 268 allActions = Flux.concat(Flux.fromIterable(this.commitActions).map(Supplier::get)) 269 .doOnError(ex -> { 270 if (this.state.compareAndSet(State.COMMITTING, State.NEW)) { 271 getHeaders().clearContentHeaders(); 272 } 273 }); 274 } 275 276 allActions = allActions.concatWith(Mono.fromRunnable(() -> { 277 applyStatusCode(); 278 applyHeaders(); 279 applyCookies(); 280 this.state.set(State.COMMITTED); 281 })); 282 283 if (writeAction != null) { 284 allActions = allActions.concatWith(writeAction.get()); 285 } 286 287 return allActions.then(); 288 } 289 290 291 /** 292 * Write to the underlying the response. 293 * @param body the publisher to write with 294 */ 295 protected abstract Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body); 296 297 /** 298 * Write to the underlying the response, and flush after each {@code Publisher<DataBuffer>}. 299 * @param body the publisher to write and flush with 300 */ 301 protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body); 302 303 /** 304 * Write the status code to the underlying response. 305 * This method is called once only. 306 */ 307 protected abstract void applyStatusCode(); 308 309 /** 310 * Invoked when the response is getting committed allowing sub-classes to 311 * make apply header values to the underlying response. 312 * <p>Note that most sub-classes use an {@link HttpHeaders} instance that 313 * wraps an adapter to the native response headers such that changes are 314 * propagated to the underlying response on the go. That means this callback 315 * is typically not used other than for specialized updates such as setting 316 * the contentType or characterEncoding fields in a Servlet response. 317 */ 318 protected abstract void applyHeaders(); 319 320 /** 321 * Add cookies from {@link #getHeaders()} to the underlying response. 322 * This method is called once only. 323 */ 324 protected abstract void applyCookies(); 325 326}