001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.mock.http.server.reactive; 018 019import java.nio.charset.Charset; 020import java.nio.charset.StandardCharsets; 021import java.util.List; 022import java.util.Optional; 023import java.util.function.Function; 024 025import org.reactivestreams.Publisher; 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028import reactor.core.publisher.MonoProcessor; 029 030import org.springframework.core.io.buffer.DataBuffer; 031import org.springframework.core.io.buffer.DataBufferFactory; 032import org.springframework.core.io.buffer.DataBufferUtils; 033import org.springframework.core.io.buffer.DefaultDataBufferFactory; 034import org.springframework.http.HttpHeaders; 035import org.springframework.http.ResponseCookie; 036import org.springframework.http.server.reactive.AbstractServerHttpResponse; 037import org.springframework.util.Assert; 038import org.springframework.util.MimeType; 039 040/** 041 * Mock extension of {@link AbstractServerHttpResponse} for use in tests without 042 * an actual server. 043 * 044 * <p>By default response content is consumed in full upon writing and cached 045 * for subsequent access, however it is also possible to set a custom 046 * {@link #setWriteHandler(Function) writeHandler}. 047 * 048 * @author Rossen Stoyanchev 049 * @since 5.0 050 */ 051public class MockServerHttpResponse extends AbstractServerHttpResponse { 052 053 private Flux<DataBuffer> body = Flux.error(new IllegalStateException( 054 "No content was written nor was setComplete() called on this response.")); 055 056 private Function<Flux<DataBuffer>, Mono<Void>> writeHandler; 057 058 059 public MockServerHttpResponse() { 060 this(new DefaultDataBufferFactory()); 061 } 062 063 public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { 064 super(dataBufferFactory); 065 this.writeHandler = body -> { 066 // Avoid .then() which causes data buffers to be released 067 MonoProcessor<Void> completion = MonoProcessor.create(); 068 this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); 069 this.body.subscribe(); 070 return completion; 071 }; 072 } 073 074 075 /** 076 * Configure a custom handler to consume the response body. 077 * <p>By default, response body content is consumed in full and cached for 078 * subsequent access in tests. Use this option to take control over how the 079 * response body is consumed. 080 * @param writeHandler the write handler to use returning {@code Mono<Void>} 081 * when the body has been "written" (i.e. consumed). 082 */ 083 public void setWriteHandler(Function<Flux<DataBuffer>, Mono<Void>> writeHandler) { 084 Assert.notNull(writeHandler, "'writeHandler' is required"); 085 this.body = Flux.error(new IllegalStateException("Not available with custom write handler.")); 086 this.writeHandler = writeHandler; 087 } 088 089 @Override 090 public <T> T getNativeResponse() { 091 throw new IllegalStateException("This is a mock. No running server, no native response."); 092 } 093 094 095 @Override 096 protected void applyStatusCode() { 097 } 098 099 @Override 100 protected void applyHeaders() { 101 } 102 103 @Override 104 protected void applyCookies() { 105 for (List<ResponseCookie> cookies : getCookies().values()) { 106 for (ResponseCookie cookie : cookies) { 107 getHeaders().add(HttpHeaders.SET_COOKIE, cookie.toString()); 108 } 109 } 110 } 111 112 @Override 113 protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) { 114 return this.writeHandler.apply(Flux.from(body)); 115 } 116 117 @Override 118 protected Mono<Void> writeAndFlushWithInternal( 119 Publisher<? extends Publisher<? extends DataBuffer>> body) { 120 121 return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from)); 122 } 123 124 @Override 125 public Mono<Void> setComplete() { 126 return doCommit(() -> Mono.defer(() -> this.writeHandler.apply(Flux.empty()))); 127 } 128 129 /** 130 * Return the response body or an error stream if the body was not set. 131 */ 132 public Flux<DataBuffer> getBody() { 133 return this.body; 134 } 135 136 /** 137 * Aggregate response data and convert to a String using the "Content-Type" 138 * charset or "UTF-8" by default. 139 */ 140 public Mono<String> getBodyAsString() { 141 142 Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset) 143 .orElse(StandardCharsets.UTF_8); 144 145 return DataBufferUtils.join(getBody()) 146 .map(buffer -> { 147 String s = buffer.toString(charset); 148 DataBufferUtils.release(buffer); 149 return s; 150 }) 151 .defaultIfEmpty(""); 152 } 153 154}