001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.mock.http.server.reactive;
018
019import java.nio.charset.Charset;
020import java.nio.charset.StandardCharsets;
021import java.util.List;
022import java.util.Optional;
023import java.util.function.Function;
024
025import org.reactivestreams.Publisher;
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028import reactor.core.publisher.MonoProcessor;
029
030import org.springframework.core.io.buffer.DataBuffer;
031import org.springframework.core.io.buffer.DataBufferFactory;
032import org.springframework.core.io.buffer.DataBufferUtils;
033import org.springframework.core.io.buffer.DefaultDataBufferFactory;
034import org.springframework.http.HttpHeaders;
035import org.springframework.http.ResponseCookie;
036import org.springframework.http.server.reactive.AbstractServerHttpResponse;
037import org.springframework.util.Assert;
038import org.springframework.util.MimeType;
039
040/**
041 * Mock extension of {@link AbstractServerHttpResponse} for use in tests without
042 * an actual server.
043 *
044 * <p>By default response content is consumed in full upon writing and cached
045 * for subsequent access, however it is also possible to set a custom
046 * {@link #setWriteHandler(Function) writeHandler}.
047 *
048 * @author Rossen Stoyanchev
049 * @since 5.0
050 */
051public class MockServerHttpResponse extends AbstractServerHttpResponse {
052
053        private Flux<DataBuffer> body = Flux.error(new IllegalStateException(
054                        "No content was written nor was setComplete() called on this response."));
055
056        private Function<Flux<DataBuffer>, Mono<Void>> writeHandler;
057
058
059        public MockServerHttpResponse() {
060                this(new DefaultDataBufferFactory());
061        }
062
063        public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
064                super(dataBufferFactory);
065                this.writeHandler = body -> {
066                        // Avoid .then() which causes data buffers to be released
067                        MonoProcessor<Void> completion = MonoProcessor.create();
068                        this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
069                        this.body.subscribe();
070                        return completion;
071                };
072        }
073
074
075        /**
076         * Configure a custom handler to consume the response body.
077         * <p>By default, response body content is consumed in full and cached for
078         * subsequent access in tests. Use this option to take control over how the
079         * response body is consumed.
080         * @param writeHandler the write handler to use returning {@code Mono<Void>}
081         * when the body has been "written" (i.e. consumed).
082         */
083        public void setWriteHandler(Function<Flux<DataBuffer>, Mono<Void>> writeHandler) {
084                Assert.notNull(writeHandler, "'writeHandler' is required");
085                this.body = Flux.error(new IllegalStateException("Not available with custom write handler."));
086                this.writeHandler = writeHandler;
087        }
088
089        @Override
090        public <T> T getNativeResponse() {
091                throw new IllegalStateException("This is a mock. No running server, no native response.");
092        }
093
094
095        @Override
096        protected void applyStatusCode() {
097        }
098
099        @Override
100        protected void applyHeaders() {
101        }
102
103        @Override
104        protected void applyCookies() {
105                for (List<ResponseCookie> cookies : getCookies().values()) {
106                        for (ResponseCookie cookie : cookies) {
107                                getHeaders().add(HttpHeaders.SET_COOKIE, cookie.toString());
108                        }
109                }
110        }
111
112        @Override
113        protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
114                return this.writeHandler.apply(Flux.from(body));
115        }
116
117        @Override
118        protected Mono<Void> writeAndFlushWithInternal(
119                        Publisher<? extends Publisher<? extends DataBuffer>> body) {
120
121                return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
122        }
123
124        @Override
125        public Mono<Void> setComplete() {
126                return doCommit(() -> Mono.defer(() -> this.writeHandler.apply(Flux.empty())));
127        }
128
129        /**
130         * Return the response body or an error stream if the body was not set.
131         */
132        public Flux<DataBuffer> getBody() {
133                return this.body;
134        }
135
136        /**
137         * Aggregate response data and convert to a String using the "Content-Type"
138         * charset or "UTF-8" by default.
139         */
140        public Mono<String> getBodyAsString() {
141
142                Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
143                                .orElse(StandardCharsets.UTF_8);
144
145                return DataBufferUtils.join(getBody())
146                                .map(buffer -> {
147                                        String s = buffer.toString(charset);
148                                        DataBufferUtils.release(buffer);
149                                        return s;
150                                })
151                                .defaultIfEmpty("");
152        }
153
154}