001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.test.web.reactive.server;
018
019import java.net.URI;
020import java.util.function.Function;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.reactivestreams.Publisher;
025import reactor.core.publisher.Flux;
026import reactor.core.publisher.Mono;
027import reactor.core.publisher.MonoProcessor;
028import reactor.core.scheduler.Schedulers;
029
030import org.springframework.core.io.buffer.DataBuffer;
031import org.springframework.http.HttpCookie;
032import org.springframework.http.HttpHeaders;
033import org.springframework.http.HttpMethod;
034import org.springframework.http.client.reactive.ClientHttpConnector;
035import org.springframework.http.client.reactive.ClientHttpRequest;
036import org.springframework.http.client.reactive.ClientHttpResponse;
037import org.springframework.http.server.reactive.HttpHandler;
038import org.springframework.http.server.reactive.HttpHeadResponseDecorator;
039import org.springframework.http.server.reactive.ServerHttpRequest;
040import org.springframework.http.server.reactive.ServerHttpResponse;
041import org.springframework.mock.http.client.reactive.MockClientHttpRequest;
042import org.springframework.mock.http.client.reactive.MockClientHttpResponse;
043import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
044import org.springframework.mock.http.server.reactive.MockServerHttpResponse;
045import org.springframework.util.Assert;
046import org.springframework.util.MultiValueMap;
047
048/**
049 * Connector that handles requests by invoking an {@link HttpHandler} rather
050 * than making actual requests to a network socket.
051 *
052 * <p>Internally the connector uses and adapts<br>
053 * {@link MockClientHttpRequest} and {@link MockClientHttpResponse} to<br>
054 * {@link MockServerHttpRequest} and {@link MockServerHttpResponse}.
055 *
056 * @author Rossen Stoyanchev
057 * @since 5.0
058 */
059public class HttpHandlerConnector implements ClientHttpConnector {
060
061        private static Log logger = LogFactory.getLog(HttpHandlerConnector.class);
062
063        private final HttpHandler handler;
064
065
066        /**
067         * Constructor with the {@link HttpHandler} to handle requests with.
068         */
069        public HttpHandlerConnector(HttpHandler handler) {
070                Assert.notNull(handler, "HttpHandler is required");
071                this.handler = handler;
072        }
073
074
075        @Override
076        public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
077                        Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
078
079                return Mono.defer(() -> doConnect(httpMethod, uri, requestCallback))
080                                .subscribeOn(Schedulers.parallel());
081        }
082
083        private Mono<ClientHttpResponse> doConnect(
084                        HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
085
086                MonoProcessor<Void> requestWriteCompletion = MonoProcessor.create();
087                MonoProcessor<Void> handlerCompletion = MonoProcessor.create();
088                ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
089
090                MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
091                MockServerHttpResponse mockServerResponse = new MockServerHttpResponse();
092
093                mockClientRequest.setWriteHandler(requestBody -> {
094                        log("Invoking HttpHandler for ", httpMethod, uri);
095                        ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
096                        ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
097                        this.handler.handle(mockServerRequest, responseToUse).subscribe(handlerCompletion);
098                        return Mono.empty();
099                });
100
101                mockServerResponse.setWriteHandler(responseBody ->
102                                Mono.fromRunnable(() -> {
103                                        log("Creating client response for ", httpMethod, uri);
104                                        savedResponse[0] = adaptResponse(mockServerResponse, responseBody);
105                                }));
106
107                log("Writing client request for ", httpMethod, uri);
108                requestCallback.apply(mockClientRequest).subscribe(requestWriteCompletion);
109
110                return Mono.when(requestWriteCompletion, handlerCompletion)
111                                .onErrorMap(ex -> {
112                                        ClientHttpResponse response = savedResponse[0];
113                                        return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex;
114                                })
115                                .then(Mono.fromCallable(() -> savedResponse[0] != null ?
116                                                savedResponse[0] : adaptResponse(mockServerResponse, Flux.empty())));
117        }
118
119        private void log(String message, HttpMethod httpMethod, URI uri) {
120                if (logger.isDebugEnabled()) {
121                        logger.debug(String.format("%s %s \"%s\"", message, httpMethod, uri));
122                }
123        }
124
125        private ServerHttpRequest adaptRequest(MockClientHttpRequest request, Publisher<DataBuffer> body) {
126                HttpMethod method = request.getMethod();
127                URI uri = request.getURI();
128                HttpHeaders headers = request.getHeaders();
129                MultiValueMap<String, HttpCookie> cookies = request.getCookies();
130                return MockServerHttpRequest.method(method, uri).headers(headers).cookies(cookies).body(body);
131        }
132
133        private ServerHttpResponse prepareResponse(ServerHttpResponse response, ServerHttpRequest request) {
134                return (request.getMethod() == HttpMethod.HEAD ? new HttpHeadResponseDecorator(response) : response);
135        }
136
137        private ClientHttpResponse adaptResponse(MockServerHttpResponse response, Flux<DataBuffer> body) {
138                Integer status = response.getRawStatusCode();
139                MockClientHttpResponse clientResponse = new MockClientHttpResponse((status != null) ? status : 200);
140                clientResponse.getHeaders().putAll(response.getHeaders());
141                clientResponse.getCookies().putAll(response.getCookies());
142                clientResponse.setBody(body);
143                return clientResponse;
144        }
145
146
147        /**
148         * Indicates that an error occurred after the server response was completed,
149         * via {@link ServerHttpResponse#writeWith} or {@link ServerHttpResponse#setComplete()},
150         * and can no longer be changed. This exception wraps the error and also
151         * provides {@link #getCompletedResponse() access} to the response.
152         * <p>What happens on an actual running server depends on when the server
153         * commits the response and the error may or may not change the response.
154         * Therefore in tests without a server the exception is wrapped and allowed
155         * to propagate so the application is alerted.
156         * @since 5.2.2
157         */
158        @SuppressWarnings("serial")
159        public static final class FailureAfterResponseCompletedException extends RuntimeException {
160
161                private final ClientHttpResponse completedResponse;
162
163
164                private FailureAfterResponseCompletedException(ClientHttpResponse response, Throwable cause) {
165                        super("Error occurred after response was completed: " + response, cause);
166                        this.completedResponse = response;
167                }
168
169
170                public ClientHttpResponse getCompletedResponse() {
171                        return this.completedResponse;
172                }
173        }
174
175}