001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.tcp.reactor;
018
019import java.lang.reflect.Method;
020import java.net.InetSocketAddress;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Properties;
025
026import io.netty.channel.EventLoopGroup;
027import io.netty.channel.nio.NioEventLoopGroup;
028import io.netty.util.concurrent.Future;
029import io.netty.util.concurrent.FutureListener;
030import org.reactivestreams.Publisher;
031import reactor.Environment;
032import reactor.core.config.ConfigurationReader;
033import reactor.core.config.DispatcherConfiguration;
034import reactor.core.config.ReactorConfiguration;
035import reactor.core.support.NamedDaemonThreadFactory;
036import reactor.fn.Consumer;
037import reactor.fn.Function;
038import reactor.fn.Supplier;
039import reactor.fn.tuple.Tuple;
040import reactor.fn.tuple.Tuple2;
041import reactor.io.buffer.Buffer;
042import reactor.io.codec.Codec;
043import reactor.io.net.ChannelStream;
044import reactor.io.net.NetStreams;
045import reactor.io.net.NetStreams.TcpClientFactory;
046import reactor.io.net.ReactorChannelHandler;
047import reactor.io.net.Reconnect;
048import reactor.io.net.Spec.TcpClientSpec;
049import reactor.io.net.config.ClientSocketOptions;
050import reactor.io.net.impl.netty.NettyClientSocketOptions;
051import reactor.io.net.impl.netty.tcp.NettyTcpClient;
052import reactor.io.net.tcp.TcpClient;
053import reactor.rx.Promise;
054import reactor.rx.Promises;
055import reactor.rx.Stream;
056import reactor.rx.Streams;
057import reactor.rx.action.Signal;
058
059import org.springframework.messaging.Message;
060import org.springframework.messaging.tcp.ReconnectStrategy;
061import org.springframework.messaging.tcp.TcpConnectionHandler;
062import org.springframework.messaging.tcp.TcpOperations;
063import org.springframework.util.Assert;
064import org.springframework.util.ReflectionUtils;
065import org.springframework.util.concurrent.ListenableFuture;
066
067/**
068 * An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
069 * based on the TCP client support of project Reactor.
070 *
071 * <p>This implementation wraps N Reactor {@code TcpClient} instances created
072 * for N {@link #connect} calls, i.e. once instance per connection.
073 *
074 * @author Rossen Stoyanchev
075 * @author Stephane Maldini
076 * @since 4.2
077 */
078public class Reactor2TcpClient<P> implements TcpOperations<P> {
079
080        @SuppressWarnings("rawtypes")
081        public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
082
083        private static final Method eventLoopGroupMethod = initEventLoopGroupMethod();
084
085
086        private final EventLoopGroup eventLoopGroup;
087
088        private final Environment environment;
089
090        private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
091
092        private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
093                        new ArrayList<TcpClient<Message<P>, Message<P>>>();
094
095        private boolean stopping;
096
097
098        /**
099         * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory
100         * with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e.
101         * relying on Netty threads. The number of Netty threads can be tweaked with
102         * the {@code reactor.tcp.ioThreadCount} System property. The network I/O
103         * threads will be shared amongst the active clients.
104         *
105         * @param host the host to connect to
106         * @param port the port to connect to
107         * @param codec the codec to use for encoding and decoding the TCP stream
108         */
109        public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
110                this(new FixedAddressSupplier(host, port), codec);
111        }
112
113        /**
114         * A variant of {@link #Reactor2TcpClient(String, int, Codec)} that takes a
115         * supplier of any number of addresses instead of just one host and port.
116         * This can be used to {@link #connect(TcpConnectionHandler, ReconnectStrategy)
117         * reconnect} to a different address after the current host becomes unavailable.
118         *
119         * @param addressSupplier supplier of addresses to use for connecting
120         * @param codec the codec to use for encoding and decoding the TCP stream
121         * @since 4.3.15
122         */
123        public Reactor2TcpClient(final Supplier<InetSocketAddress> addressSupplier,
124                        final Codec<Buffer, Message<P>, Message<P>> codec) {
125
126                // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
127                final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
128                this.eventLoopGroup = nioEventLoopGroup;
129                this.environment = new Environment(new SynchronousDispatcherConfigReader());
130
131                this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
132                        @Override
133                        public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
134                                return spec
135                                                .env(environment)
136                                                .codec(codec)
137                                                .connect(addressSupplier)
138                                                .options(createClientSocketOptions());
139                        }
140
141                        private ClientSocketOptions createClientSocketOptions() {
142                                return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod,
143                                                new NettyClientSocketOptions(), nioEventLoopGroup);
144                        }
145                };
146        }
147
148        /**
149         * A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
150         * factory. This might be used to add SSL or specific network parameters to
151         * the generated client configuration.
152         *
153         * <p><strong>NOTE:</strong> if the client is configured with a thread-creating
154         * dispatcher, you are responsible for cleaning them, e.g. via
155         * {@link reactor.core.Dispatcher#shutdown}.
156         *
157         * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for
158         * each client creation
159         */
160        public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
161                Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
162                this.tcpClientSpecFactory = tcpClientSpecFactory;
163                this.eventLoopGroup = null;
164                this.environment = null;
165        }
166
167
168        public static NioEventLoopGroup initEventLoopGroup() {
169                int ioThreadCount;
170                try {
171                        ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
172                }
173                catch (Throwable ex) {
174                        ioThreadCount = -1;
175                }
176                if (ioThreadCount <= 0) {
177                        ioThreadCount = Runtime.getRuntime().availableProcessors();
178                }
179                return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
180        }
181
182
183        @Override
184        public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
185                Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
186
187                final TcpClient<Message<P>, Message<P>> tcpClient;
188                final Runnable cleanupTask;
189                synchronized (this.tcpClients) {
190                        if (this.stopping) {
191                                IllegalStateException ex = new IllegalStateException("Shutting down.");
192                                connectionHandler.afterConnectFailure(ex);
193                                return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex));
194                        }
195                        tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
196                        this.tcpClients.add(tcpClient);
197                        cleanupTask = new Runnable() {
198                                @Override
199                                public void run() {
200                                        synchronized (tcpClients) {
201                                                tcpClients.remove(tcpClient);
202                                        }
203                                }
204                        };
205                }
206
207                Promise<Void> promise = tcpClient.start(
208                                new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask));
209
210                return new PassThroughPromiseToListenableFutureAdapter<Void>(
211                                promise.onError(new Consumer<Throwable>() {
212                                        @Override
213                                        public void accept(Throwable ex) {
214                                                cleanupTask.run();
215                                                connectionHandler.afterConnectFailure(ex);
216                                        }
217                                })
218                );
219        }
220
221        @Override
222        public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
223                Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
224                Assert.notNull(strategy, "ReconnectStrategy must not be null");
225
226                final TcpClient<Message<P>, Message<P>> tcpClient;
227                Runnable cleanupTask;
228                synchronized (this.tcpClients) {
229                        if (this.stopping) {
230                                IllegalStateException ex = new IllegalStateException("Shutting down.");
231                                connectionHandler.afterConnectFailure(ex);
232                                return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex));
233                        }
234                        tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
235                        this.tcpClients.add(tcpClient);
236                        cleanupTask = new Runnable() {
237                                @Override
238                                public void run() {
239                                        synchronized (tcpClients) {
240                                                tcpClients.remove(tcpClient);
241                                        }
242                                }
243                        };
244                }
245
246                Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
247                                new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask),
248                                new ReactorReconnectAdapter(strategy));
249
250                return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
251        }
252
253        @Override
254        public ListenableFuture<Void> shutdown() {
255                synchronized (this.tcpClients) {
256                        this.stopping = true;
257                }
258
259                Promise<Void> promise = Streams.from(this.tcpClients)
260                                .flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() {
261                                        @Override
262                                        public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> client) {
263                                                return client.shutdown().onComplete(new Consumer<Promise<Void>>() {
264                                                        @Override
265                                                        public void accept(Promise<Void> voidPromise) {
266                                                                tcpClients.remove(client);
267                                                        }
268                                                });
269                                        }
270                                })
271                                .next();
272
273                if (this.eventLoopGroup != null) {
274                        final Promise<Void> eventLoopPromise = Promises.prepare();
275                        promise.onComplete(new Consumer<Promise<Void>>() {
276                                @Override
277                                public void accept(Promise<Void> voidPromise) {
278                                        eventLoopGroup.shutdownGracefully().addListener(new FutureListener<Object>() {
279                                                @Override
280                                                public void operationComplete(Future<Object> future) throws Exception {
281                                                        if (future.isSuccess()) {
282                                                                eventLoopPromise.onComplete();
283                                                        }
284                                                        else {
285                                                                eventLoopPromise.onError(future.cause());
286                                                        }
287                                                }
288                                        });
289                                }
290                        });
291                        promise = eventLoopPromise;
292                }
293
294                if (this.environment != null) {
295                        promise.onComplete(new Consumer<Promise<Void>>() {
296                                @Override
297                                public void accept(Promise<Void> voidPromise) {
298                                        environment.shutdown();
299                                }
300                        });
301                }
302
303                return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
304        }
305
306
307        private static Method initEventLoopGroupMethod() {
308                for (Method method : NettyClientSocketOptions.class.getMethods()) {
309                        if (method.getName().equals("eventLoopGroup") && method.getParameterTypes().length == 1) {
310                                return method;
311                        }
312                }
313                throw new IllegalStateException("No compatible Reactor version found");
314        }
315
316
317        private static class FixedAddressSupplier implements Supplier<InetSocketAddress> {
318
319                private final InetSocketAddress address;
320
321                FixedAddressSupplier(String host, int port) {
322                        this.address = new InetSocketAddress(host, port);
323                }
324
325                @Override
326                public InetSocketAddress get() {
327                        return this.address;
328                }
329        }
330
331
332        private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
333
334                @Override
335                public ReactorConfiguration read() {
336                        return new ReactorConfiguration(
337                                        Collections.<DispatcherConfiguration>emptyList(), "sync", new Properties());
338                }
339        }
340
341
342        private static class MessageChannelStreamHandler<P>
343                        implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
344
345                private final TcpConnectionHandler<P> connectionHandler;
346
347                private final Runnable cleanupTask;
348
349                public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
350                        this.connectionHandler = connectionHandler;
351                        this.cleanupTask = cleanupTask;
352                }
353
354                @Override
355                public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
356                        Promise<Void> closePromise = Promises.prepare();
357                        this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, closePromise));
358                        channelStream
359                                        .finallyDo(new Consumer<Signal<Message<P>>>() {
360                                                @Override
361                                                public void accept(Signal<Message<P>> signal) {
362                                                        cleanupTask.run();
363                                                        if (signal.isOnError()) {
364                                                                connectionHandler.handleFailure(signal.getThrowable());
365                                                        }
366                                                        else if (signal.isOnComplete()) {
367                                                                connectionHandler.afterConnectionClosed();
368                                                        }
369                                                }
370                                        })
371                                        .consume(new Consumer<Message<P>>() {
372                                                @Override
373                                                public void accept(Message<P> message) {
374                                                        connectionHandler.handleMessage(message);
375                                                }
376                                        });
377
378                        return closePromise;
379                }
380        }
381
382
383        private static class ReactorReconnectAdapter implements Reconnect {
384
385                private final ReconnectStrategy strategy;
386
387                public ReactorReconnectAdapter(ReconnectStrategy strategy) {
388                        this.strategy = strategy;
389                }
390
391                @Override
392                public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
393                        return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt));
394                }
395        }
396
397}