001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.test.web.reactive.server; 018 019import java.net.URI; 020import java.util.function.Function; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.reactivestreams.Publisher; 025import reactor.core.publisher.Flux; 026import reactor.core.publisher.Mono; 027import reactor.core.publisher.MonoProcessor; 028import reactor.core.scheduler.Schedulers; 029 030import org.springframework.core.io.buffer.DataBuffer; 031import org.springframework.http.HttpCookie; 032import org.springframework.http.HttpHeaders; 033import org.springframework.http.HttpMethod; 034import org.springframework.http.client.reactive.ClientHttpConnector; 035import org.springframework.http.client.reactive.ClientHttpRequest; 036import org.springframework.http.client.reactive.ClientHttpResponse; 037import org.springframework.http.server.reactive.HttpHandler; 038import org.springframework.http.server.reactive.HttpHeadResponseDecorator; 039import org.springframework.http.server.reactive.ServerHttpRequest; 040import org.springframework.http.server.reactive.ServerHttpResponse; 041import org.springframework.mock.http.client.reactive.MockClientHttpRequest; 042import org.springframework.mock.http.client.reactive.MockClientHttpResponse; 043import org.springframework.mock.http.server.reactive.MockServerHttpRequest; 044import org.springframework.mock.http.server.reactive.MockServerHttpResponse; 045import org.springframework.util.Assert; 046import org.springframework.util.MultiValueMap; 047 048/** 049 * Connector that handles requests by invoking an {@link HttpHandler} rather 050 * than making actual requests to a network socket. 051 * 052 * <p>Internally the connector uses and adapts<br> 053 * {@link MockClientHttpRequest} and {@link MockClientHttpResponse} to<br> 054 * {@link MockServerHttpRequest} and {@link MockServerHttpResponse}. 055 * 056 * @author Rossen Stoyanchev 057 * @since 5.0 058 */ 059public class HttpHandlerConnector implements ClientHttpConnector { 060 061 private static Log logger = LogFactory.getLog(HttpHandlerConnector.class); 062 063 private final HttpHandler handler; 064 065 066 /** 067 * Constructor with the {@link HttpHandler} to handle requests with. 068 */ 069 public HttpHandlerConnector(HttpHandler handler) { 070 Assert.notNull(handler, "HttpHandler is required"); 071 this.handler = handler; 072 } 073 074 075 @Override 076 public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri, 077 Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { 078 079 return Mono.defer(() -> doConnect(httpMethod, uri, requestCallback)) 080 .subscribeOn(Schedulers.parallel()); 081 } 082 083 private Mono<ClientHttpResponse> doConnect( 084 HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { 085 086 MonoProcessor<Void> requestWriteCompletion = MonoProcessor.create(); 087 MonoProcessor<Void> handlerCompletion = MonoProcessor.create(); 088 ClientHttpResponse[] savedResponse = new ClientHttpResponse[1]; 089 090 MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri); 091 MockServerHttpResponse mockServerResponse = new MockServerHttpResponse(); 092 093 mockClientRequest.setWriteHandler(requestBody -> { 094 log("Invoking HttpHandler for ", httpMethod, uri); 095 ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody); 096 ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest); 097 this.handler.handle(mockServerRequest, responseToUse).subscribe(handlerCompletion); 098 return Mono.empty(); 099 }); 100 101 mockServerResponse.setWriteHandler(responseBody -> 102 Mono.fromRunnable(() -> { 103 log("Creating client response for ", httpMethod, uri); 104 savedResponse[0] = adaptResponse(mockServerResponse, responseBody); 105 })); 106 107 log("Writing client request for ", httpMethod, uri); 108 requestCallback.apply(mockClientRequest).subscribe(requestWriteCompletion); 109 110 return Mono.when(requestWriteCompletion, handlerCompletion) 111 .onErrorMap(ex -> { 112 ClientHttpResponse response = savedResponse[0]; 113 return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex; 114 }) 115 .then(Mono.fromCallable(() -> savedResponse[0] != null ? 116 savedResponse[0] : adaptResponse(mockServerResponse, Flux.empty()))); 117 } 118 119 private void log(String message, HttpMethod httpMethod, URI uri) { 120 if (logger.isDebugEnabled()) { 121 logger.debug(String.format("%s %s \"%s\"", message, httpMethod, uri)); 122 } 123 } 124 125 private ServerHttpRequest adaptRequest(MockClientHttpRequest request, Publisher<DataBuffer> body) { 126 HttpMethod method = request.getMethod(); 127 URI uri = request.getURI(); 128 HttpHeaders headers = request.getHeaders(); 129 MultiValueMap<String, HttpCookie> cookies = request.getCookies(); 130 return MockServerHttpRequest.method(method, uri).headers(headers).cookies(cookies).body(body); 131 } 132 133 private ServerHttpResponse prepareResponse(ServerHttpResponse response, ServerHttpRequest request) { 134 return (request.getMethod() == HttpMethod.HEAD ? new HttpHeadResponseDecorator(response) : response); 135 } 136 137 private ClientHttpResponse adaptResponse(MockServerHttpResponse response, Flux<DataBuffer> body) { 138 Integer status = response.getRawStatusCode(); 139 MockClientHttpResponse clientResponse = new MockClientHttpResponse((status != null) ? status : 200); 140 clientResponse.getHeaders().putAll(response.getHeaders()); 141 clientResponse.getCookies().putAll(response.getCookies()); 142 clientResponse.setBody(body); 143 return clientResponse; 144 } 145 146 147 /** 148 * Indicates that an error occurred after the server response was completed, 149 * via {@link ServerHttpResponse#writeWith} or {@link ServerHttpResponse#setComplete()}, 150 * and can no longer be changed. This exception wraps the error and also 151 * provides {@link #getCompletedResponse() access} to the response. 152 * <p>What happens on an actual running server depends on when the server 153 * commits the response and the error may or may not change the response. 154 * Therefore in tests without a server the exception is wrapped and allowed 155 * to propagate so the application is alerted. 156 * @since 5.2.2 157 */ 158 @SuppressWarnings("serial") 159 public static final class FailureAfterResponseCompletedException extends RuntimeException { 160 161 private final ClientHttpResponse completedResponse; 162 163 164 private FailureAfterResponseCompletedException(ClientHttpResponse response, Throwable cause) { 165 super("Error occurred after response was completed: " + response, cause); 166 this.completedResponse = response; 167 } 168 169 170 public ClientHttpResponse getCompletedResponse() { 171 return this.completedResponse; 172 } 173 } 174 175}