001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.tcp.reactor; 018 019import java.lang.reflect.Method; 020import java.net.InetSocketAddress; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Properties; 025 026import io.netty.channel.EventLoopGroup; 027import io.netty.channel.nio.NioEventLoopGroup; 028import io.netty.util.concurrent.Future; 029import io.netty.util.concurrent.FutureListener; 030import org.reactivestreams.Publisher; 031import reactor.Environment; 032import reactor.core.config.ConfigurationReader; 033import reactor.core.config.DispatcherConfiguration; 034import reactor.core.config.ReactorConfiguration; 035import reactor.core.support.NamedDaemonThreadFactory; 036import reactor.fn.Consumer; 037import reactor.fn.Function; 038import reactor.fn.Supplier; 039import reactor.fn.tuple.Tuple; 040import reactor.fn.tuple.Tuple2; 041import reactor.io.buffer.Buffer; 042import reactor.io.codec.Codec; 043import reactor.io.net.ChannelStream; 044import reactor.io.net.NetStreams; 045import reactor.io.net.NetStreams.TcpClientFactory; 046import reactor.io.net.ReactorChannelHandler; 047import reactor.io.net.Reconnect; 048import reactor.io.net.Spec.TcpClientSpec; 049import reactor.io.net.config.ClientSocketOptions; 050import reactor.io.net.impl.netty.NettyClientSocketOptions; 051import reactor.io.net.impl.netty.tcp.NettyTcpClient; 052import reactor.io.net.tcp.TcpClient; 053import reactor.rx.Promise; 054import reactor.rx.Promises; 055import reactor.rx.Stream; 056import reactor.rx.Streams; 057import reactor.rx.action.Signal; 058 059import org.springframework.messaging.Message; 060import org.springframework.messaging.tcp.ReconnectStrategy; 061import org.springframework.messaging.tcp.TcpConnectionHandler; 062import org.springframework.messaging.tcp.TcpOperations; 063import org.springframework.util.Assert; 064import org.springframework.util.ReflectionUtils; 065import org.springframework.util.concurrent.ListenableFuture; 066 067/** 068 * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} 069 * based on the TCP client support of project Reactor. 070 * 071 * <p>This implementation wraps N Reactor {@code TcpClient} instances created 072 * for N {@link #connect} calls, i.e. once instance per connection. 073 * 074 * @author Rossen Stoyanchev 075 * @author Stephane Maldini 076 * @since 4.2 077 */ 078public class Reactor2TcpClient<P> implements TcpOperations<P> { 079 080 @SuppressWarnings("rawtypes") 081 public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; 082 083 private static final Method eventLoopGroupMethod = initEventLoopGroupMethod(); 084 085 086 private final EventLoopGroup eventLoopGroup; 087 088 private final Environment environment; 089 090 private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory; 091 092 private final List<TcpClient<Message<P>, Message<P>>> tcpClients = 093 new ArrayList<TcpClient<Message<P>, Message<P>>>(); 094 095 private boolean stopping; 096 097 098 /** 099 * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory 100 * with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e. 101 * relying on Netty threads. The number of Netty threads can be tweaked with 102 * the {@code reactor.tcp.ioThreadCount} System property. The network I/O 103 * threads will be shared amongst the active clients. 104 * 105 * @param host the host to connect to 106 * @param port the port to connect to 107 * @param codec the codec to use for encoding and decoding the TCP stream 108 */ 109 public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) { 110 this(new FixedAddressSupplier(host, port), codec); 111 } 112 113 /** 114 * A variant of {@link #Reactor2TcpClient(String, int, Codec)} that takes a 115 * supplier of any number of addresses instead of just one host and port. 116 * This can be used to {@link #connect(TcpConnectionHandler, ReconnectStrategy) 117 * reconnect} to a different address after the current host becomes unavailable. 118 * 119 * @param addressSupplier supplier of addresses to use for connecting 120 * @param codec the codec to use for encoding and decoding the TCP stream 121 * @since 4.3.15 122 */ 123 public Reactor2TcpClient(final Supplier<InetSocketAddress> addressSupplier, 124 final Codec<Buffer, Message<P>, Message<P>> codec) { 125 126 // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup 127 final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); 128 this.eventLoopGroup = nioEventLoopGroup; 129 this.environment = new Environment(new SynchronousDispatcherConfigReader()); 130 131 this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() { 132 @Override 133 public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) { 134 return spec 135 .env(environment) 136 .codec(codec) 137 .connect(addressSupplier) 138 .options(createClientSocketOptions()); 139 } 140 141 private ClientSocketOptions createClientSocketOptions() { 142 return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod, 143 new NettyClientSocketOptions(), nioEventLoopGroup); 144 } 145 }; 146 } 147 148 /** 149 * A constructor with a pre-configured {@link TcpClientSpec} {@link Function} 150 * factory. This might be used to add SSL or specific network parameters to 151 * the generated client configuration. 152 * 153 * <p><strong>NOTE:</strong> if the client is configured with a thread-creating 154 * dispatcher, you are responsible for cleaning them, e.g. via 155 * {@link reactor.core.Dispatcher#shutdown}. 156 * 157 * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for 158 * each client creation 159 */ 160 public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) { 161 Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); 162 this.tcpClientSpecFactory = tcpClientSpecFactory; 163 this.eventLoopGroup = null; 164 this.environment = null; 165 } 166 167 168 public static NioEventLoopGroup initEventLoopGroup() { 169 int ioThreadCount; 170 try { 171 ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); 172 } 173 catch (Throwable ex) { 174 ioThreadCount = -1; 175 } 176 if (ioThreadCount <= 0) { 177 ioThreadCount = Runtime.getRuntime().availableProcessors(); 178 } 179 return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io")); 180 } 181 182 183 @Override 184 public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) { 185 Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); 186 187 final TcpClient<Message<P>, Message<P>> tcpClient; 188 final Runnable cleanupTask; 189 synchronized (this.tcpClients) { 190 if (this.stopping) { 191 IllegalStateException ex = new IllegalStateException("Shutting down."); 192 connectionHandler.afterConnectFailure(ex); 193 return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex)); 194 } 195 tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); 196 this.tcpClients.add(tcpClient); 197 cleanupTask = new Runnable() { 198 @Override 199 public void run() { 200 synchronized (tcpClients) { 201 tcpClients.remove(tcpClient); 202 } 203 } 204 }; 205 } 206 207 Promise<Void> promise = tcpClient.start( 208 new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask)); 209 210 return new PassThroughPromiseToListenableFutureAdapter<Void>( 211 promise.onError(new Consumer<Throwable>() { 212 @Override 213 public void accept(Throwable ex) { 214 cleanupTask.run(); 215 connectionHandler.afterConnectFailure(ex); 216 } 217 }) 218 ); 219 } 220 221 @Override 222 public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) { 223 Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); 224 Assert.notNull(strategy, "ReconnectStrategy must not be null"); 225 226 final TcpClient<Message<P>, Message<P>> tcpClient; 227 Runnable cleanupTask; 228 synchronized (this.tcpClients) { 229 if (this.stopping) { 230 IllegalStateException ex = new IllegalStateException("Shutting down."); 231 connectionHandler.afterConnectFailure(ex); 232 return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex)); 233 } 234 tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); 235 this.tcpClients.add(tcpClient); 236 cleanupTask = new Runnable() { 237 @Override 238 public void run() { 239 synchronized (tcpClients) { 240 tcpClients.remove(tcpClient); 241 } 242 } 243 }; 244 } 245 246 Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start( 247 new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask), 248 new ReactorReconnectAdapter(strategy)); 249 250 return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after()); 251 } 252 253 @Override 254 public ListenableFuture<Void> shutdown() { 255 synchronized (this.tcpClients) { 256 this.stopping = true; 257 } 258 259 Promise<Void> promise = Streams.from(this.tcpClients) 260 .flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() { 261 @Override 262 public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> client) { 263 return client.shutdown().onComplete(new Consumer<Promise<Void>>() { 264 @Override 265 public void accept(Promise<Void> voidPromise) { 266 tcpClients.remove(client); 267 } 268 }); 269 } 270 }) 271 .next(); 272 273 if (this.eventLoopGroup != null) { 274 final Promise<Void> eventLoopPromise = Promises.prepare(); 275 promise.onComplete(new Consumer<Promise<Void>>() { 276 @Override 277 public void accept(Promise<Void> voidPromise) { 278 eventLoopGroup.shutdownGracefully().addListener(new FutureListener<Object>() { 279 @Override 280 public void operationComplete(Future<Object> future) throws Exception { 281 if (future.isSuccess()) { 282 eventLoopPromise.onComplete(); 283 } 284 else { 285 eventLoopPromise.onError(future.cause()); 286 } 287 } 288 }); 289 } 290 }); 291 promise = eventLoopPromise; 292 } 293 294 if (this.environment != null) { 295 promise.onComplete(new Consumer<Promise<Void>>() { 296 @Override 297 public void accept(Promise<Void> voidPromise) { 298 environment.shutdown(); 299 } 300 }); 301 } 302 303 return new PassThroughPromiseToListenableFutureAdapter<Void>(promise); 304 } 305 306 307 private static Method initEventLoopGroupMethod() { 308 for (Method method : NettyClientSocketOptions.class.getMethods()) { 309 if (method.getName().equals("eventLoopGroup") && method.getParameterTypes().length == 1) { 310 return method; 311 } 312 } 313 throw new IllegalStateException("No compatible Reactor version found"); 314 } 315 316 317 private static class FixedAddressSupplier implements Supplier<InetSocketAddress> { 318 319 private final InetSocketAddress address; 320 321 FixedAddressSupplier(String host, int port) { 322 this.address = new InetSocketAddress(host, port); 323 } 324 325 @Override 326 public InetSocketAddress get() { 327 return this.address; 328 } 329 } 330 331 332 private static class SynchronousDispatcherConfigReader implements ConfigurationReader { 333 334 @Override 335 public ReactorConfiguration read() { 336 return new ReactorConfiguration( 337 Collections.<DispatcherConfiguration>emptyList(), "sync", new Properties()); 338 } 339 } 340 341 342 private static class MessageChannelStreamHandler<P> 343 implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> { 344 345 private final TcpConnectionHandler<P> connectionHandler; 346 347 private final Runnable cleanupTask; 348 349 public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) { 350 this.connectionHandler = connectionHandler; 351 this.cleanupTask = cleanupTask; 352 } 353 354 @Override 355 public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) { 356 Promise<Void> closePromise = Promises.prepare(); 357 this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, closePromise)); 358 channelStream 359 .finallyDo(new Consumer<Signal<Message<P>>>() { 360 @Override 361 public void accept(Signal<Message<P>> signal) { 362 cleanupTask.run(); 363 if (signal.isOnError()) { 364 connectionHandler.handleFailure(signal.getThrowable()); 365 } 366 else if (signal.isOnComplete()) { 367 connectionHandler.afterConnectionClosed(); 368 } 369 } 370 }) 371 .consume(new Consumer<Message<P>>() { 372 @Override 373 public void accept(Message<P> message) { 374 connectionHandler.handleMessage(message); 375 } 376 }); 377 378 return closePromise; 379 } 380 } 381 382 383 private static class ReactorReconnectAdapter implements Reconnect { 384 385 private final ReconnectStrategy strategy; 386 387 public ReactorReconnectAdapter(ReconnectStrategy strategy) { 388 this.strategy = strategy; 389 } 390 391 @Override 392 public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) { 393 return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt)); 394 } 395 } 396 397}