001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.client.reactive; 018 019import java.time.Duration; 020import java.util.function.Consumer; 021import java.util.function.Supplier; 022 023import reactor.netty.http.HttpResources; 024import reactor.netty.resources.ConnectionProvider; 025import reactor.netty.resources.LoopResources; 026 027import org.springframework.beans.factory.DisposableBean; 028import org.springframework.beans.factory.InitializingBean; 029import org.springframework.lang.Nullable; 030import org.springframework.util.Assert; 031 032/** 033 * Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for 034 * event loop threads, and {@link ConnectionProvider} for the connection pool, 035 * within the lifecycle of a Spring {@code ApplicationContext}. 036 * 037 * <p>This factory implements {@link InitializingBean} and {@link DisposableBean} 038 * and is expected typically to be declared as a Spring-managed bean. 039 * 040 * @author Rossen Stoyanchev 041 * @author Brian Clozel 042 * @since 5.1 043 */ 044public class ReactorResourceFactory implements InitializingBean, DisposableBean { 045 046 private boolean useGlobalResources = true; 047 048 @Nullable 049 private Consumer<HttpResources> globalResourcesConsumer; 050 051 @SuppressWarnings("deprecation") 052 private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500); 053 054 @Nullable 055 private ConnectionProvider connectionProvider; 056 057 private Supplier<LoopResources> loopResourcesSupplier = () -> LoopResources.create("webflux-http"); 058 059 @Nullable 060 private LoopResources loopResources; 061 062 private boolean manageConnectionProvider = false; 063 064 private boolean manageLoopResources = false; 065 066 private Duration shutdownQuietPeriod = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_QUIET_PERIOD); 067 068 private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); 069 070 071 /** 072 * Whether to use global Reactor Netty resources via {@link HttpResources}. 073 * <p>Default is "true" in which case this factory initializes and stops the 074 * global Reactor Netty resources within Spring's {@code ApplicationContext} 075 * lifecycle. If set to "false" the factory manages its resources independent 076 * of the global ones. 077 * @param useGlobalResources whether to expose and manage the global resources 078 * @see #addGlobalResourcesConsumer(Consumer) 079 */ 080 public void setUseGlobalResources(boolean useGlobalResources) { 081 this.useGlobalResources = useGlobalResources; 082 } 083 084 /** 085 * Whether this factory exposes the global 086 * {@link reactor.netty.http.HttpResources HttpResources} holder. 087 */ 088 public boolean isUseGlobalResources() { 089 return this.useGlobalResources; 090 } 091 092 /** 093 * Add a Consumer for configuring the global Reactor Netty resources on 094 * startup. When this option is used, {@link #setUseGlobalResources} is also 095 * enabled. 096 * @param consumer the consumer to apply 097 * @see #setUseGlobalResources(boolean) 098 */ 099 public void addGlobalResourcesConsumer(Consumer<HttpResources> consumer) { 100 this.useGlobalResources = true; 101 this.globalResourcesConsumer = (this.globalResourcesConsumer != null ? 102 this.globalResourcesConsumer.andThen(consumer) : consumer); 103 } 104 105 /** 106 * Use this when you don't want to participate in global resources and 107 * you want to customize the creation of the managed {@code ConnectionProvider}. 108 * <p>By default, {@code ConnectionProvider.elastic("http")} is used. 109 * <p>Note that this option is ignored if {@code userGlobalResources=false} or 110 * {@link #setConnectionProvider(ConnectionProvider)} is set. 111 * @param supplier the supplier to use 112 */ 113 public void setConnectionProviderSupplier(Supplier<ConnectionProvider> supplier) { 114 this.connectionProviderSupplier = supplier; 115 } 116 117 /** 118 * Use this when you want to provide an externally managed 119 * {@link ConnectionProvider} instance. 120 * @param connectionProvider the connection provider to use as is 121 */ 122 public void setConnectionProvider(ConnectionProvider connectionProvider) { 123 this.connectionProvider = connectionProvider; 124 } 125 126 /** 127 * Return the configured {@link ConnectionProvider}. 128 */ 129 public ConnectionProvider getConnectionProvider() { 130 Assert.state(this.connectionProvider != null, "ConnectionProvider not initialized yet"); 131 return this.connectionProvider; 132 } 133 134 /** 135 * Use this when you don't want to participate in global resources and 136 * you want to customize the creation of the managed {@code LoopResources}. 137 * <p>By default, {@code LoopResources.create("reactor-http")} is used. 138 * <p>Note that this option is ignored if {@code userGlobalResources=false} or 139 * {@link #setLoopResources(LoopResources)} is set. 140 * @param supplier the supplier to use 141 */ 142 public void setLoopResourcesSupplier(Supplier<LoopResources> supplier) { 143 this.loopResourcesSupplier = supplier; 144 } 145 146 /** 147 * Use this option when you want to provide an externally managed 148 * {@link LoopResources} instance. 149 * @param loopResources the loop resources to use as is 150 */ 151 public void setLoopResources(LoopResources loopResources) { 152 this.loopResources = loopResources; 153 } 154 155 /** 156 * Return the configured {@link LoopResources}. 157 */ 158 public LoopResources getLoopResources() { 159 Assert.state(this.loopResources != null, "LoopResources not initialized yet"); 160 return this.loopResources; 161 } 162 163 /** 164 * Configure the amount of time we'll wait before shutting down resources. 165 * If a task is submitted during the {@code shutdownQuietPeriod}, it is guaranteed 166 * to be accepted and the {@code shutdownQuietPeriod} will start over. 167 * <p>By default, this is set to 168 * {@link LoopResources#DEFAULT_SHUTDOWN_QUIET_PERIOD} which is 2 seconds but 169 * can also be overridden with the system property 170 * {@link reactor.netty.ReactorNetty#SHUTDOWN_QUIET_PERIOD 171 * ReactorNetty.SHUTDOWN_QUIET_PERIOD}. 172 * @since 5.2.4 173 * @see #setShutdownTimeout(Duration) 174 */ 175 public void setShutdownQuietPeriod(Duration shutdownQuietPeriod) { 176 Assert.notNull(shutdownQuietPeriod, "shutdownQuietPeriod should not be null"); 177 this.shutdownQuietPeriod = shutdownQuietPeriod; 178 } 179 180 /** 181 * Configure the maximum amount of time to wait until the disposal of the 182 * underlying resources regardless if a task was submitted during the 183 * {@code shutdownQuietPeriod}. 184 * <p>By default, this is set to 185 * {@link LoopResources#DEFAULT_SHUTDOWN_TIMEOUT} which is 15 seconds but 186 * can also be overridden with the system property 187 * {@link reactor.netty.ReactorNetty#SHUTDOWN_TIMEOUT 188 * ReactorNetty.SHUTDOWN_TIMEOUT}. 189 * @since 5.2.4 190 * @see #setShutdownQuietPeriod(Duration) 191 */ 192 public void setShutdownTimeout(Duration shutdownTimeout) { 193 Assert.notNull(shutdownTimeout, "shutdownTimeout should not be null"); 194 this.shutdownTimeout = shutdownTimeout; 195 } 196 197 198 @Override 199 public void afterPropertiesSet() { 200 if (this.useGlobalResources) { 201 Assert.isTrue(this.loopResources == null && this.connectionProvider == null, 202 "'useGlobalResources' is mutually exclusive with explicitly configured resources"); 203 HttpResources httpResources = HttpResources.get(); 204 if (this.globalResourcesConsumer != null) { 205 this.globalResourcesConsumer.accept(httpResources); 206 } 207 this.connectionProvider = httpResources; 208 this.loopResources = httpResources; 209 } 210 else { 211 if (this.loopResources == null) { 212 this.manageLoopResources = true; 213 this.loopResources = this.loopResourcesSupplier.get(); 214 } 215 if (this.connectionProvider == null) { 216 this.manageConnectionProvider = true; 217 this.connectionProvider = this.connectionProviderSupplier.get(); 218 } 219 } 220 } 221 222 @Override 223 public void destroy() { 224 if (this.useGlobalResources) { 225 HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); 226 } 227 else { 228 try { 229 ConnectionProvider provider = this.connectionProvider; 230 if (provider != null && this.manageConnectionProvider) { 231 provider.disposeLater().block(); 232 } 233 } 234 catch (Throwable ex) { 235 // ignore 236 } 237 238 try { 239 LoopResources resources = this.loopResources; 240 if (resources != null && this.manageLoopResources) { 241 resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); 242 } 243 } 244 catch (Throwable ex) { 245 // ignore 246 } 247 } 248 } 249 250}