001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.client.reactive;
018
019import java.time.Duration;
020import java.util.function.Consumer;
021import java.util.function.Supplier;
022
023import reactor.netty.http.HttpResources;
024import reactor.netty.resources.ConnectionProvider;
025import reactor.netty.resources.LoopResources;
026
027import org.springframework.beans.factory.DisposableBean;
028import org.springframework.beans.factory.InitializingBean;
029import org.springframework.lang.Nullable;
030import org.springframework.util.Assert;
031
032/**
033 * Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for
034 * event loop threads, and {@link ConnectionProvider} for the connection pool,
035 * within the lifecycle of a Spring {@code ApplicationContext}.
036 *
037 * <p>This factory implements {@link InitializingBean} and {@link DisposableBean}
038 * and is expected typically to be declared as a Spring-managed bean.
039 *
040 * @author Rossen Stoyanchev
041 * @author Brian Clozel
042 * @since 5.1
043 */
044public class ReactorResourceFactory implements InitializingBean, DisposableBean {
045
046        private boolean useGlobalResources = true;
047
048        @Nullable
049        private Consumer<HttpResources> globalResourcesConsumer;
050
051        @SuppressWarnings("deprecation")
052        private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500);
053
054        @Nullable
055        private ConnectionProvider connectionProvider;
056
057        private Supplier<LoopResources> loopResourcesSupplier = () -> LoopResources.create("webflux-http");
058
059        @Nullable
060        private LoopResources loopResources;
061
062        private boolean manageConnectionProvider = false;
063
064        private boolean manageLoopResources = false;
065
066        private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD);
067
068        private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT);
069
070
071        /**
072         * Whether to use global Reactor Netty resources via {@link HttpResources}.
073         * <p>Default is "true" in which case this factory initializes and stops the
074         * global Reactor Netty resources within Spring's {@code ApplicationContext}
075         * lifecycle. If set to "false" the factory manages its resources independent
076         * of the global ones.
077         * @param useGlobalResources whether to expose and manage the global resources
078         * @see #addGlobalResourcesConsumer(Consumer)
079         */
080        public void setUseGlobalResources(boolean useGlobalResources) {
081                this.useGlobalResources = useGlobalResources;
082        }
083
084        /**
085         * Whether this factory exposes the global
086         * {@link reactor.netty.http.HttpResources HttpResources} holder.
087         */
088        public boolean isUseGlobalResources() {
089                return this.useGlobalResources;
090        }
091
092        /**
093         * Add a Consumer for configuring the global Reactor Netty resources on
094         * startup. When this option is used, {@link #setUseGlobalResources} is also
095         * enabled.
096         * @param consumer the consumer to apply
097         * @see #setUseGlobalResources(boolean)
098         */
099        public void addGlobalResourcesConsumer(Consumer<HttpResources> consumer) {
100                this.useGlobalResources = true;
101                this.globalResourcesConsumer = (this.globalResourcesConsumer != null ?
102                                this.globalResourcesConsumer.andThen(consumer) : consumer);
103        }
104
105        /**
106         * Use this when you don't want to participate in global resources and
107         * you want to customize the creation of the managed {@code ConnectionProvider}.
108         * <p>By default, {@code ConnectionProvider.elastic("http")} is used.
109         * <p>Note that this option is ignored if {@code userGlobalResources=false} or
110         * {@link #setConnectionProvider(ConnectionProvider)} is set.
111         * @param supplier the supplier to use
112         */
113        public void setConnectionProviderSupplier(Supplier<ConnectionProvider> supplier) {
114                this.connectionProviderSupplier = supplier;
115        }
116
117        /**
118         * Use this when you want to provide an externally managed
119         * {@link ConnectionProvider} instance.
120         * @param connectionProvider the connection provider to use as is
121         */
122        public void setConnectionProvider(ConnectionProvider connectionProvider) {
123                this.connectionProvider = connectionProvider;
124        }
125
126        /**
127         * Return the configured {@link ConnectionProvider}.
128         */
129        public ConnectionProvider getConnectionProvider() {
130                Assert.state(this.connectionProvider != null, "ConnectionProvider not initialized yet");
131                return this.connectionProvider;
132        }
133
134        /**
135         * Use this when you don't want to participate in global resources and
136         * you want to customize the creation of the managed {@code LoopResources}.
137         * <p>By default, {@code LoopResources.create("reactor-http")} is used.
138         * <p>Note that this option is ignored if {@code userGlobalResources=false} or
139         * {@link #setLoopResources(LoopResources)} is set.
140         * @param supplier the supplier to use
141         */
142        public void setLoopResourcesSupplier(Supplier<LoopResources> supplier) {
143                this.loopResourcesSupplier = supplier;
144        }
145
146        /**
147         * Use this option when you want to provide an externally managed
148         * {@link LoopResources} instance.
149         * @param loopResources the loop resources to use as is
150         */
151        public void setLoopResources(LoopResources loopResources) {
152                this.loopResources = loopResources;
153        }
154
155        /**
156         * Return the configured {@link LoopResources}.
157         */
158        public LoopResources getLoopResources() {
159                Assert.state(this.loopResources != null, "LoopResources not initialized yet");
160                return this.loopResources;
161        }
162
163        /**
164         * Configure the amount of time we'll wait before shutting down resources.
165         * If a task is submitted during the {@code shutdownQuietPeriod}, it is guaranteed
166         * to be accepted and the {@code shutdownQuietPeriod} will start over.
167         * <p>By default, this is set to
168         * {@link LoopResources#DEFAULT_SHUTDOWN_QUIET_PERIOD} which is 2 seconds but
169         * can also be overridden with the system property
170         * {@link reactor.netty.ReactorNetty#SHUTDOWN_QUIET_PERIOD
171         * ReactorNetty.SHUTDOWN_QUIET_PERIOD}.
172         * @since 5.2.4
173         * @see #setShutdownTimeout(Duration)
174         */
175        public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) {
176                Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null");
177                this.shutdownQuietPeriod = shutdownQuietPeriod;
178        }
179
180        /**
181         * Configure the maximum amount of time to wait until the disposal of the
182         * underlying resources regardless if a task was submitted during the
183         * {@code shutdownQuietPeriod}.
184         * <p>By default, this is set to
185         * {@link LoopResources#DEFAULT_SHUTDOWN_TIMEOUT} which is 15 seconds but
186         * can also be overridden with the system property
187         * {@link reactor.netty.ReactorNetty#SHUTDOWN_TIMEOUT
188         * ReactorNetty.SHUTDOWN_TIMEOUT}.
189         * @since 5.2.4
190         * @see #setShutdownQuietPeriod(Duration)
191         */
192        public void setShutdownTimeout(Duration shutdownTimeout) {
193                Assert.notNull(shutdownTimeout, "shutdownTimeout should not be null");
194                this.shutdownTimeout = shutdownTimeout;
195        }
196
197
198        @Override
199        public void afterPropertiesSet() {
200                if (this.useGlobalResources) {
201                        Assert.isTrue(this.loopResources == null && this.connectionProvider == null,
202                                        "'useGlobalResources' is mutually exclusive with explicitly configured resources");
203                        HttpResources httpResources = HttpResources.get();
204                        if (this.globalResourcesConsumer != null) {
205                                this.globalResourcesConsumer.accept(httpResources);
206                        }
207                        this.connectionProvider = httpResources;
208                        this.loopResources = httpResources;
209                }
210                else {
211                        if (this.loopResources == null) {
212                                this.manageLoopResources = true;
213                                this.loopResources = this.loopResourcesSupplier.get();
214                        }
215                        if (this.connectionProvider == null) {
216                                this.manageConnectionProvider = true;
217                                this.connectionProvider = this.connectionProviderSupplier.get();
218                        }
219                }
220        }
221
222        @Override
223        public void destroy() {
224                if (this.useGlobalResources) {
225                        HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
226                }
227                else {
228                        try {
229                                ConnectionProvider provider = this.connectionProvider;
230                                if (provider != null && this.manageConnectionProvider) {
231                                        provider.disposeLater().block();
232                                }
233                        }
234                        catch (Throwable ex) {
235                                // ignore
236                        }
237
238                        try {
239                                LoopResources resources = this.loopResources;
240                                if (resources != null && this.manageLoopResources) {
241                                        resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
242                                }
243                        }
244                        catch (Throwable ex) {
245                                // ignore
246                        }
247                }
248        }
249
250}