001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.client.reactive;
018
019import java.net.URI;
020import java.util.function.Consumer;
021import java.util.function.Function;
022
023import org.eclipse.jetty.client.HttpClient;
024import org.eclipse.jetty.client.api.Request;
025import org.eclipse.jetty.reactive.client.ContentChunk;
026import org.eclipse.jetty.reactive.client.ReactiveRequest;
027import reactor.core.publisher.Flux;
028import reactor.core.publisher.Mono;
029
030import org.springframework.core.io.buffer.DataBuffer;
031import org.springframework.core.io.buffer.DataBufferFactory;
032import org.springframework.core.io.buffer.DefaultDataBufferFactory;
033import org.springframework.http.HttpMethod;
034import org.springframework.lang.Nullable;
035import org.springframework.util.Assert;
036
037/**
038 * {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient.
039 *
040 * @author Sebastien Deleuze
041 * @since 5.1
042 * @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">Jetty ReactiveStreams HttpClient</a>
043 */
044public class JettyClientHttpConnector implements ClientHttpConnector {
045
046        private final HttpClient httpClient;
047
048        private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
049
050
051        /**
052         * Default constructor that creates a new instance of {@link HttpClient}.
053         */
054        public JettyClientHttpConnector() {
055                this(new HttpClient());
056        }
057
058        /**
059         * Constructor with an initialized {@link HttpClient}.
060         */
061        public JettyClientHttpConnector(HttpClient httpClient) {
062                this(httpClient, null);
063        }
064
065        /**
066         * Constructor with an initialized {@link HttpClient} and configures it
067         * with the given {@link JettyResourceFactory}.
068         * @param httpClient the {@link HttpClient} to use
069         * @param resourceFactory the {@link JettyResourceFactory} to use
070         * @since 5.2
071         */
072        public JettyClientHttpConnector(HttpClient httpClient, @Nullable JettyResourceFactory resourceFactory) {
073                Assert.notNull(httpClient, "HttpClient is required");
074                if (resourceFactory != null) {
075                        httpClient.setExecutor(resourceFactory.getExecutor());
076                        httpClient.setByteBufferPool(resourceFactory.getByteBufferPool());
077                        httpClient.setScheduler(resourceFactory.getScheduler());
078                }
079                this.httpClient = httpClient;
080        }
081
082        /**
083         * Constructor with an {@link JettyResourceFactory} that will manage shared resources.
084         * @param resourceFactory the {@link JettyResourceFactory} to use
085         * @param customizer the lambda used to customize the {@link HttpClient}
086         * @deprecated as of 5.2, in favor of
087         * {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)}
088         */
089        @Deprecated
090        public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
091                this(new HttpClient(), resourceFactory);
092                if (customizer != null) {
093                        customizer.accept(this.httpClient);
094                }
095        }
096
097
098        public void setBufferFactory(DataBufferFactory bufferFactory) {
099                this.bufferFactory = bufferFactory;
100        }
101
102
103        @Override
104        public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
105                        Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
106
107                if (!uri.isAbsolute()) {
108                        return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
109                }
110
111                if (!this.httpClient.isStarted()) {
112                        try {
113                                this.httpClient.start();
114                        }
115                        catch (Exception ex) {
116                                return Mono.error(ex);
117                        }
118                }
119
120                Request request = this.httpClient.newRequest(uri).method(method.toString());
121
122                return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory))
123                                .then(Mono.fromDirect(ReactiveRequest.newBuilder(request).build()
124                                                .response((reactiveResponse, chunkPublisher) -> {
125                                                        Flux<DataBuffer> content = Flux.from(chunkPublisher).map(this::toDataBuffer);
126                                                        return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
127                                                })));
128        }
129
130        private DataBuffer toDataBuffer(ContentChunk chunk) {
131
132                // We must copy until this is resolved:
133                // https://github.com/eclipse/jetty.project/issues/2429
134
135                // Use copy instead of buffer wrapping because Callback#succeeded() is
136                // used not only to release the buffer but also to request more data
137                // which is a problem for codecs that buffer data.
138
139                DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
140                buffer.write(chunk.buffer);
141                chunk.callback.succeeded();
142                return buffer;
143        }
144
145}