001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.tcp.reactor;
018
019import java.time.Duration;
020import java.util.Collection;
021import java.util.List;
022import java.util.Optional;
023import java.util.function.BiFunction;
024import java.util.function.Consumer;
025import java.util.function.Function;
026
027import io.netty.buffer.ByteBuf;
028import io.netty.channel.ChannelHandlerContext;
029import io.netty.channel.group.ChannelGroup;
030import io.netty.channel.group.DefaultChannelGroup;
031import io.netty.handler.codec.ByteToMessageDecoder;
032import io.netty.util.concurrent.ImmediateEventExecutor;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.reactivestreams.Publisher;
036import reactor.core.publisher.DirectProcessor;
037import reactor.core.publisher.Flux;
038import reactor.core.publisher.Mono;
039import reactor.core.publisher.MonoProcessor;
040import reactor.core.scheduler.Scheduler;
041import reactor.core.scheduler.Schedulers;
042import reactor.netty.Connection;
043import reactor.netty.FutureMono;
044import reactor.netty.NettyInbound;
045import reactor.netty.NettyOutbound;
046import reactor.netty.resources.ConnectionProvider;
047import reactor.netty.resources.LoopResources;
048import reactor.netty.tcp.TcpClient;
049
050import org.springframework.lang.Nullable;
051import org.springframework.messaging.Message;
052import org.springframework.messaging.tcp.ReconnectStrategy;
053import org.springframework.messaging.tcp.TcpConnection;
054import org.springframework.messaging.tcp.TcpConnectionHandler;
055import org.springframework.messaging.tcp.TcpOperations;
056import org.springframework.util.Assert;
057import org.springframework.util.concurrent.ListenableFuture;
058import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
059import org.springframework.util.concurrent.SettableListenableFuture;
060
061/**
062 * Reactor Netty based implementation of {@link TcpOperations}.
063 *
064 * @author Rossen Stoyanchev
065 * @author Stephane Maldini
066 * @since 5.0
067 * @param <P> the type of payload for in and outbound messages
068 */
069public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
070
071        private static final int PUBLISH_ON_BUFFER_SIZE = 16;
072
073
074        private final TcpClient tcpClient;
075
076        private final ReactorNettyCodec<P> codec;
077
078        @Nullable
079        private final ChannelGroup channelGroup;
080
081        @Nullable
082        private final LoopResources loopResources;
083
084        @Nullable
085        private final ConnectionProvider poolResources;
086
087        private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
088
089        private Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
090
091        private volatile boolean stopping = false;
092
093
094        /**
095         * Simple constructor with the host and port to use to connect to.
096         * <p>This constructor manages the lifecycle of the {@link TcpClient} and
097         * underlying resources such as {@link ConnectionProvider},
098         * {@link LoopResources}, and {@link ChannelGroup}.
099         * <p>For full control over the initialization and lifecycle of the
100         * TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
101         * @param host the host to connect to
102         * @param port the port to connect to
103         * @param codec for encoding and decoding the input/output byte streams
104         * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
105         */
106        @SuppressWarnings("deprecation")
107        public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
108                Assert.notNull(host, "host is required");
109                Assert.notNull(codec, "ReactorNettyCodec is required");
110
111                this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
112                this.loopResources = LoopResources.create("tcp-client-loop");
113                this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
114                this.codec = codec;
115
116                this.tcpClient = TcpClient.create(this.poolResources)
117                                .host(host).port(port)
118                                .runOn(this.loopResources, false)
119                                .doOnConnected(conn -> this.channelGroup.add(conn.channel()));
120        }
121
122        /**
123         * A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
124         * that still manages the lifecycle of the {@link TcpClient} and underlying
125         * resources, but allows for direct configuration of other properties of the
126         * client through a {@code Function<TcpClient, TcpClient>}.
127         * @param clientConfigurer the configurer function
128         * @param codec for encoding and decoding the input/output byte streams
129         * @since 5.1.3
130         * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
131         */
132        @SuppressWarnings("deprecation")
133        public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
134                Assert.notNull(codec, "ReactorNettyCodec is required");
135
136                this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
137                this.loopResources = LoopResources.create("tcp-client-loop");
138                this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
139                this.codec = codec;
140
141                this.tcpClient = clientConfigurer.apply(TcpClient
142                                .create(this.poolResources)
143                                .runOn(this.loopResources, false)
144                                .doOnConnected(conn -> this.channelGroup.add(conn.channel())));
145        }
146
147        /**
148         * Constructor with an externally created {@link TcpClient} instance whose
149         * lifecycle is expected to be managed externally.
150         * @param tcpClient the TcpClient instance to use
151         * @param codec for encoding and decoding the input/output byte streams
152         * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
153         */
154        public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec<P> codec) {
155                Assert.notNull(tcpClient, "TcpClient is required");
156                Assert.notNull(codec, "ReactorNettyCodec is required");
157                this.tcpClient = tcpClient;
158                this.codec = codec;
159
160                this.channelGroup = null;
161                this.loopResources = null;
162                this.poolResources = null;
163        }
164
165
166        /**
167         * Set an alternative logger to use than the one based on the class name.
168         * @param logger the logger to use
169         * @since 5.1
170         */
171        public void setLogger(Log logger) {
172                this.logger = logger;
173        }
174
175        /**
176         * Return the currently configured Logger.
177         * @since 5.1
178         */
179        public Log getLogger() {
180                return logger;
181        }
182
183
184        @Override
185        public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
186                Assert.notNull(handler, "TcpConnectionHandler is required");
187
188                if (this.stopping) {
189                        return handleShuttingDownConnectFailure(handler);
190                }
191
192                Mono<Void> connectMono = this.tcpClient
193                                .handle(new ReactorNettyHandler(handler))
194                                .connect()
195                                .doOnError(handler::afterConnectFailure)
196                                .then();
197
198                return new MonoToListenableFutureAdapter<>(connectMono);
199        }
200
201        @Override
202        @SuppressWarnings("deprecation")
203        public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
204                Assert.notNull(handler, "TcpConnectionHandler is required");
205                Assert.notNull(strategy, "ReconnectStrategy is required");
206
207                if (this.stopping) {
208                        return handleShuttingDownConnectFailure(handler);
209                }
210
211                // Report first connect to the ListenableFuture
212                MonoProcessor<Void> connectMono = MonoProcessor.create();
213
214                this.tcpClient
215                                .handle(new ReactorNettyHandler(handler))
216                                .connect()
217                                .doOnNext(updateConnectMono(connectMono))
218                                .doOnError(updateConnectMono(connectMono))
219                                .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
220                                .flatMap(Connection::onDispose)             // post-connect issues
221                                .retryWhen(reconnectFunction(strategy))
222                                .repeatWhen(reconnectFunction(strategy))
223                                .subscribe();
224
225                return new MonoToListenableFutureAdapter<>(connectMono);
226        }
227
228        private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
229                IllegalStateException ex = new IllegalStateException("Shutting down.");
230                handler.afterConnectFailure(ex);
231                return new MonoToListenableFutureAdapter<>(Mono.error(ex));
232        }
233
234        private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
235                return o -> {
236                        if (!connectMono.isTerminated()) {
237                                if (o instanceof Throwable) {
238                                        connectMono.onError((Throwable) o);
239                                }
240                                else {
241                                        connectMono.onComplete();
242                                }
243                        }
244                };
245        }
246
247        private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
248                return flux -> flux
249                                .scan(1, (count, element) -> count++)
250                                .flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
251                                                .map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
252                                                .orElse(Mono.empty()));
253        }
254
255        @Override
256        public ListenableFuture<Void> shutdown() {
257                if (this.stopping) {
258                        SettableListenableFuture<Void> future = new SettableListenableFuture<>();
259                        future.set(null);
260                        return future;
261                }
262
263                this.stopping = true;
264
265                Mono<Void> result;
266                if (this.channelGroup != null) {
267                        result = FutureMono.from(this.channelGroup.close());
268                        if (this.loopResources != null) {
269                                result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
270                        }
271                        if (this.poolResources != null) {
272                                result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
273                        }
274                        result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
275                }
276                else {
277                        result = stopScheduler();
278                }
279
280                return new MonoToListenableFutureAdapter<>(result);
281        }
282
283        private Mono<Void> stopScheduler() {
284                return Mono.fromRunnable(() -> {
285                        this.scheduler.dispose();
286                        for (int i = 0; i < 20; i++) {
287                                if (this.scheduler.isDisposed()) {
288                                        break;
289                                }
290                                try {
291                                        Thread.sleep(100);
292                                }
293                                catch (Throwable ex) {
294                                        break;
295                                }
296                        }
297                });
298        }
299
300        @Override
301        public String toString() {
302                return "ReactorNettyTcpClient[" + this.tcpClient + "]";
303        }
304
305
306        private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
307
308                private final TcpConnectionHandler<P> connectionHandler;
309
310                ReactorNettyHandler(TcpConnectionHandler<P> handler) {
311                        this.connectionHandler = handler;
312                }
313
314                @Override
315                @SuppressWarnings("unchecked")
316                public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
317                        inbound.withConnection(conn -> {
318                                if (logger.isDebugEnabled()) {
319                                        logger.debug("Connected to " + conn.address());
320                                }
321                        });
322                        DirectProcessor<Void> completion = DirectProcessor.create();
323                        TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound,  codec, completion);
324                        scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
325
326                        inbound.withConnection(conn -> conn.addHandler(new StompMessageDecoder<>(codec)));
327
328                        inbound.receiveObject()
329                                        .cast(Message.class)
330                                        .publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
331                                        .subscribe(
332                                                        this.connectionHandler::handleMessage,
333                                                        this.connectionHandler::handleFailure,
334                                                        this.connectionHandler::afterConnectionClosed);
335
336                        return completion;
337                }
338        }
339
340
341        private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
342
343                private final ReactorNettyCodec<P> codec;
344
345                public StompMessageDecoder(ReactorNettyCodec<P> codec) {
346                        this.codec = codec;
347                }
348
349                @Override
350                protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
351                        Collection<Message<P>> messages = this.codec.decode(in);
352                        out.addAll(messages);
353                }
354        }
355
356}