001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.tcp.reactor; 018 019import java.time.Duration; 020import java.util.Collection; 021import java.util.List; 022import java.util.Optional; 023import java.util.function.BiFunction; 024import java.util.function.Consumer; 025import java.util.function.Function; 026 027import io.netty.buffer.ByteBuf; 028import io.netty.channel.ChannelHandlerContext; 029import io.netty.channel.group.ChannelGroup; 030import io.netty.channel.group.DefaultChannelGroup; 031import io.netty.handler.codec.ByteToMessageDecoder; 032import io.netty.util.concurrent.ImmediateEventExecutor; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.reactivestreams.Publisher; 036import reactor.core.publisher.DirectProcessor; 037import reactor.core.publisher.Flux; 038import reactor.core.publisher.Mono; 039import reactor.core.publisher.MonoProcessor; 040import reactor.core.scheduler.Scheduler; 041import reactor.core.scheduler.Schedulers; 042import reactor.netty.Connection; 043import reactor.netty.FutureMono; 044import reactor.netty.NettyInbound; 045import reactor.netty.NettyOutbound; 046import reactor.netty.resources.ConnectionProvider; 047import reactor.netty.resources.LoopResources; 048import reactor.netty.tcp.TcpClient; 049 050import org.springframework.lang.Nullable; 051import org.springframework.messaging.Message; 052import org.springframework.messaging.tcp.ReconnectStrategy; 053import org.springframework.messaging.tcp.TcpConnection; 054import org.springframework.messaging.tcp.TcpConnectionHandler; 055import org.springframework.messaging.tcp.TcpOperations; 056import org.springframework.util.Assert; 057import org.springframework.util.concurrent.ListenableFuture; 058import org.springframework.util.concurrent.MonoToListenableFutureAdapter; 059import org.springframework.util.concurrent.SettableListenableFuture; 060 061/** 062 * Reactor Netty based implementation of {@link TcpOperations}. 063 * 064 * @author Rossen Stoyanchev 065 * @author Stephane Maldini 066 * @since 5.0 067 * @param <P> the type of payload for in and outbound messages 068 */ 069public class ReactorNettyTcpClient<P> implements TcpOperations<P> { 070 071 private static final int PUBLISH_ON_BUFFER_SIZE = 16; 072 073 074 private final TcpClient tcpClient; 075 076 private final ReactorNettyCodec<P> codec; 077 078 @Nullable 079 private final ChannelGroup channelGroup; 080 081 @Nullable 082 private final LoopResources loopResources; 083 084 @Nullable 085 private final ConnectionProvider poolResources; 086 087 private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler"); 088 089 private Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); 090 091 private volatile boolean stopping = false; 092 093 094 /** 095 * Simple constructor with the host and port to use to connect to. 096 * <p>This constructor manages the lifecycle of the {@link TcpClient} and 097 * underlying resources such as {@link ConnectionProvider}, 098 * {@link LoopResources}, and {@link ChannelGroup}. 099 * <p>For full control over the initialization and lifecycle of the 100 * TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}. 101 * @param host the host to connect to 102 * @param port the port to connect to 103 * @param codec for encoding and decoding the input/output byte streams 104 * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec 105 */ 106 @SuppressWarnings("deprecation") 107 public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) { 108 Assert.notNull(host, "host is required"); 109 Assert.notNull(codec, "ReactorNettyCodec is required"); 110 111 this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); 112 this.loopResources = LoopResources.create("tcp-client-loop"); 113 this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); 114 this.codec = codec; 115 116 this.tcpClient = TcpClient.create(this.poolResources) 117 .host(host).port(port) 118 .runOn(this.loopResources, false) 119 .doOnConnected(conn -> this.channelGroup.add(conn.channel())); 120 } 121 122 /** 123 * A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)} 124 * that still manages the lifecycle of the {@link TcpClient} and underlying 125 * resources, but allows for direct configuration of other properties of the 126 * client through a {@code Function<TcpClient, TcpClient>}. 127 * @param clientConfigurer the configurer function 128 * @param codec for encoding and decoding the input/output byte streams 129 * @since 5.1.3 130 * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec 131 */ 132 @SuppressWarnings("deprecation") 133 public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) { 134 Assert.notNull(codec, "ReactorNettyCodec is required"); 135 136 this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); 137 this.loopResources = LoopResources.create("tcp-client-loop"); 138 this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); 139 this.codec = codec; 140 141 this.tcpClient = clientConfigurer.apply(TcpClient 142 .create(this.poolResources) 143 .runOn(this.loopResources, false) 144 .doOnConnected(conn -> this.channelGroup.add(conn.channel()))); 145 } 146 147 /** 148 * Constructor with an externally created {@link TcpClient} instance whose 149 * lifecycle is expected to be managed externally. 150 * @param tcpClient the TcpClient instance to use 151 * @param codec for encoding and decoding the input/output byte streams 152 * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec 153 */ 154 public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec<P> codec) { 155 Assert.notNull(tcpClient, "TcpClient is required"); 156 Assert.notNull(codec, "ReactorNettyCodec is required"); 157 this.tcpClient = tcpClient; 158 this.codec = codec; 159 160 this.channelGroup = null; 161 this.loopResources = null; 162 this.poolResources = null; 163 } 164 165 166 /** 167 * Set an alternative logger to use than the one based on the class name. 168 * @param logger the logger to use 169 * @since 5.1 170 */ 171 public void setLogger(Log logger) { 172 this.logger = logger; 173 } 174 175 /** 176 * Return the currently configured Logger. 177 * @since 5.1 178 */ 179 public Log getLogger() { 180 return logger; 181 } 182 183 184 @Override 185 public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) { 186 Assert.notNull(handler, "TcpConnectionHandler is required"); 187 188 if (this.stopping) { 189 return handleShuttingDownConnectFailure(handler); 190 } 191 192 Mono<Void> connectMono = this.tcpClient 193 .handle(new ReactorNettyHandler(handler)) 194 .connect() 195 .doOnError(handler::afterConnectFailure) 196 .then(); 197 198 return new MonoToListenableFutureAdapter<>(connectMono); 199 } 200 201 @Override 202 @SuppressWarnings("deprecation") 203 public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) { 204 Assert.notNull(handler, "TcpConnectionHandler is required"); 205 Assert.notNull(strategy, "ReconnectStrategy is required"); 206 207 if (this.stopping) { 208 return handleShuttingDownConnectFailure(handler); 209 } 210 211 // Report first connect to the ListenableFuture 212 MonoProcessor<Void> connectMono = MonoProcessor.create(); 213 214 this.tcpClient 215 .handle(new ReactorNettyHandler(handler)) 216 .connect() 217 .doOnNext(updateConnectMono(connectMono)) 218 .doOnError(updateConnectMono(connectMono)) 219 .doOnError(handler::afterConnectFailure) // report all connect failures to the handler 220 .flatMap(Connection::onDispose) // post-connect issues 221 .retryWhen(reconnectFunction(strategy)) 222 .repeatWhen(reconnectFunction(strategy)) 223 .subscribe(); 224 225 return new MonoToListenableFutureAdapter<>(connectMono); 226 } 227 228 private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) { 229 IllegalStateException ex = new IllegalStateException("Shutting down."); 230 handler.afterConnectFailure(ex); 231 return new MonoToListenableFutureAdapter<>(Mono.error(ex)); 232 } 233 234 private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) { 235 return o -> { 236 if (!connectMono.isTerminated()) { 237 if (o instanceof Throwable) { 238 connectMono.onError((Throwable) o); 239 } 240 else { 241 connectMono.onComplete(); 242 } 243 } 244 }; 245 } 246 247 private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) { 248 return flux -> flux 249 .scan(1, (count, element) -> count++) 250 .flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt)) 251 .map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler)) 252 .orElse(Mono.empty())); 253 } 254 255 @Override 256 public ListenableFuture<Void> shutdown() { 257 if (this.stopping) { 258 SettableListenableFuture<Void> future = new SettableListenableFuture<>(); 259 future.set(null); 260 return future; 261 } 262 263 this.stopping = true; 264 265 Mono<Void> result; 266 if (this.channelGroup != null) { 267 result = FutureMono.from(this.channelGroup.close()); 268 if (this.loopResources != null) { 269 result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater()); 270 } 271 if (this.poolResources != null) { 272 result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater()); 273 } 274 result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler()); 275 } 276 else { 277 result = stopScheduler(); 278 } 279 280 return new MonoToListenableFutureAdapter<>(result); 281 } 282 283 private Mono<Void> stopScheduler() { 284 return Mono.fromRunnable(() -> { 285 this.scheduler.dispose(); 286 for (int i = 0; i < 20; i++) { 287 if (this.scheduler.isDisposed()) { 288 break; 289 } 290 try { 291 Thread.sleep(100); 292 } 293 catch (Throwable ex) { 294 break; 295 } 296 } 297 }); 298 } 299 300 @Override 301 public String toString() { 302 return "ReactorNettyTcpClient[" + this.tcpClient + "]"; 303 } 304 305 306 private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { 307 308 private final TcpConnectionHandler<P> connectionHandler; 309 310 ReactorNettyHandler(TcpConnectionHandler<P> handler) { 311 this.connectionHandler = handler; 312 } 313 314 @Override 315 @SuppressWarnings("unchecked") 316 public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) { 317 inbound.withConnection(conn -> { 318 if (logger.isDebugEnabled()) { 319 logger.debug("Connected to " + conn.address()); 320 } 321 }); 322 DirectProcessor<Void> completion = DirectProcessor.create(); 323 TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); 324 scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); 325 326 inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec))); 327 328 inbound.receiveObject() 329 .cast(Message.class) 330 .publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE) 331 .subscribe( 332 this.connectionHandler::handleMessage, 333 this.connectionHandler::handleFailure, 334 this.connectionHandler::afterConnectionClosed); 335 336 return completion; 337 } 338 } 339 340 341 private static class StompMessageDecoder<P> extends ByteToMessageDecoder { 342 343 private final ReactorNettyCodec<P> codec; 344 345 public StompMessageDecoder(ReactorNettyCodec<P> codec) { 346 this.codec = codec; 347 } 348 349 @Override 350 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 351 Collection<Message<P>> messages = this.codec.decode(in); 352 out.addAll(messages); 353 } 354 } 355 356}