001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.client.reactive; 018 019import java.net.URI; 020import java.util.function.Consumer; 021import java.util.function.Function; 022 023import org.eclipse.jetty.client.HttpClient; 024import org.eclipse.jetty.client.api.Request; 025import org.eclipse.jetty.reactive.client.ContentChunk; 026import org.eclipse.jetty.reactive.client.ReactiveRequest; 027import reactor.core.publisher.Flux; 028import reactor.core.publisher.Mono; 029 030import org.springframework.core.io.buffer.DataBuffer; 031import org.springframework.core.io.buffer.DataBufferFactory; 032import org.springframework.core.io.buffer.DefaultDataBufferFactory; 033import org.springframework.http.HttpMethod; 034import org.springframework.lang.Nullable; 035import org.springframework.util.Assert; 036 037/** 038 * {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient. 039 * 040 * @author Sebastien Deleuze 041 * @since 5.1 042 * @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a> 043 */ 044public class JettyClientHttpConnector implements ClientHttpConnector { 045 046 private final HttpClient httpClient; 047 048 private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 049 050 051 /** 052 * Default constructor that creates a new instance of {@link HttpClient}. 053 */ 054 public JettyClientHttpConnector() { 055 this(new HttpClient()); 056 } 057 058 /** 059 * Constructor with an initialized {@link HttpClient}. 060 */ 061 public JettyClientHttpConnector(HttpClient httpClient) { 062 this(httpClient, null); 063 } 064 065 /** 066 * Constructor with an initialized {@link HttpClient} and configures it 067 * with the given {@link JettyResourceFactory}. 068 * @param httpClient the {@link HttpClient} to use 069 * @param resourceFactory the {@link JettyResourceFactory} to use 070 * @since 5.2 071 */ 072 public JettyClientHttpConnector(HttpClient httpClient, @Nullable JettyResourceFactory resourceFactory) { 073 Assert.notNull(httpClient, "HttpClient is required"); 074 if (resourceFactory != null) { 075 httpClient.setExecutor(resourceFactory.getExecutor()); 076 httpClient.setByteBufferPool(resourceFactory.getByteBufferPool()); 077 httpClient.setScheduler(resourceFactory.getScheduler()); 078 } 079 this.httpClient = httpClient; 080 } 081 082 /** 083 * Constructor with an {@link JettyResourceFactory} that will manage shared resources. 084 * @param resourceFactory the {@link JettyResourceFactory} to use 085 * @param customizer the lambda used to customize the {@link HttpClient} 086 * @deprecated as of 5.2, in favor of 087 * {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)} 088 */ 089 @Deprecated 090 public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) { 091 this(new HttpClient(), resourceFactory); 092 if (customizer != null) { 093 customizer.accept(this.httpClient); 094 } 095 } 096 097 098 public void setBufferFactory(DataBufferFactory bufferFactory) { 099 this.bufferFactory = bufferFactory; 100 } 101 102 103 @Override 104 public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, 105 Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { 106 107 if (!uri.isAbsolute()) { 108 return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); 109 } 110 111 if (!this.httpClient.isStarted()) { 112 try { 113 this.httpClient.start(); 114 } 115 catch (Exception ex) { 116 return Mono.error(ex); 117 } 118 } 119 120 Request request = this.httpClient.newRequest(uri).method(method.toString()); 121 122 return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory)) 123 .then(Mono.fromDirect(ReactiveRequest.newBuilder(request).build() 124 .response((reactiveResponse, chunkPublisher) -> { 125 Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer); 126 return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); 127 }))); 128 } 129 130 private DataBuffer toDataBuffer(ContentChunk chunk) { 131 132 // We must copy until this is resolved: 133 // https://github.com/eclipse/jetty.project/issues/2429 134 135 // Use copy instead of buffer wrapping because Callback#succeeded() is 136 // used not only to release the buffer but also to request more data 137 // which is a problem for codecs that buffer data. 138 139 DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity()); 140 buffer.write(chunk.buffer); 141 chunk.callback.succeeded(); 142 return buffer; 143 } 144 145}