001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.client.reactive; 018 019import java.net.URI; 020import java.util.concurrent.atomic.AtomicReference; 021import java.util.function.Function; 022 023import reactor.core.publisher.Mono; 024import reactor.netty.NettyOutbound; 025import reactor.netty.http.client.HttpClient; 026import reactor.netty.http.client.HttpClientRequest; 027import reactor.netty.resources.ConnectionProvider; 028import reactor.netty.resources.LoopResources; 029 030import org.springframework.http.HttpMethod; 031import org.springframework.util.Assert; 032 033/** 034 * Reactor-Netty implementation of {@link ClientHttpConnector}. 035 * 036 * @author Brian Clozel 037 * @author Rossen Stoyanchev 038 * @since 5.0 039 * @see reactor.netty.http.client.HttpClient 040 */ 041public class ReactorClientHttpConnector implements ClientHttpConnector { 042 043 private final static Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true); 044 045 046 private final HttpClient httpClient; 047 048 049 /** 050 * Default constructor. Initializes {@link HttpClient} via: 051 * <pre class="code"> 052 * HttpClient.create().compress() 053 * </pre> 054 */ 055 public ReactorClientHttpConnector() { 056 this.httpClient = defaultInitializer.apply(HttpClient.create()); 057 } 058 059 /** 060 * Constructor with externally managed Reactor Netty resources, including 061 * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} 062 * for the connection pool. 063 * <p>This constructor should be used only when you don't want the client 064 * to participate in the Reactor Netty global resources. By default the 065 * client participates in the Reactor Netty global resources held in 066 * {@link reactor.netty.http.HttpResources}, which is recommended since 067 * fixed, shared resources are favored for event loop concurrency. However, 068 * consider declaring a {@link ReactorResourceFactory} bean with 069 * {@code globalResources=true} in order to ensure the Reactor Netty global 070 * resources are shut down when the Spring ApplicationContext is closed. 071 * @param factory the resource factory to obtain the resources from 072 * @param mapper a mapper for further initialization of the created client 073 * @since 5.1 074 */ 075 public ReactorClientHttpConnector(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) { 076 this.httpClient = defaultInitializer.andThen(mapper).apply(initHttpClient(factory)); 077 } 078 079 private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) { 080 ConnectionProvider provider = resourceFactory.getConnectionProvider(); 081 LoopResources resources = resourceFactory.getLoopResources(); 082 Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); 083 Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); 084 return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources)); 085 } 086 087 /** 088 * Constructor with a pre-configured {@code HttpClient} instance. 089 * @param httpClient the client to use 090 * @since 5.1 091 */ 092 public ReactorClientHttpConnector(HttpClient httpClient) { 093 Assert.notNull(httpClient, "HttpClient is required"); 094 this.httpClient = httpClient; 095 } 096 097 098 @Override 099 public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, 100 Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { 101 102 if (!uri.isAbsolute()) { 103 return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); 104 } 105 106 AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>(); 107 108 return this.httpClient 109 .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) 110 .uri(uri.toString()) 111 .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) 112 .responseConnection((response, connection) -> { 113 responseRef.set(new ReactorClientHttpResponse(response, connection)); 114 return Mono.just((ClientHttpResponse) responseRef.get()); 115 }) 116 .next() 117 .doOnCancel(() -> { 118 ReactorClientHttpResponse response = responseRef.get(); 119 if (response != null) { 120 response.releaseAfterCancel(method); 121 } 122 }); 123 } 124 125 private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, 126 NettyOutbound nettyOutbound) { 127 128 return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); 129 } 130 131}