001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import io.netty.channel.EventLoopGroup;
020import reactor.Environment;
021import reactor.io.net.NetStreams.TcpClientFactory;
022import reactor.io.net.Spec.TcpClientSpec;
023import reactor.io.net.impl.netty.NettyClientSocketOptions;
024
025import org.springframework.context.Lifecycle;
026import org.springframework.messaging.Message;
027import org.springframework.messaging.tcp.TcpOperations;
028import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
029import org.springframework.util.concurrent.ListenableFuture;
030
031/**
032 * A STOMP over TCP client that uses {@link Reactor2TcpClient}.
033 *
034 * @author Rossen Stoyanchev
035 * @since 4.2
036 */
037public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle {
038
039        private final TcpOperations<byte[]> tcpClient;
040
041        private final EventLoopGroup eventLoopGroup;
042
043        private final Environment environment;
044
045        private volatile boolean running = false;
046
047
048        /**
049         * Create an instance with host "127.0.0.1" and port 61613.
050         */
051        public Reactor2TcpStompClient() {
052                this("127.0.0.1", 61613);
053        }
054
055        /**
056         * Create an instance with the given host and port to connect to
057         */
058        public Reactor2TcpStompClient(String host, int port) {
059                this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
060                this.environment = new Environment();
061                this.tcpClient = new Reactor2TcpClient<byte[]>(
062                                new StompTcpClientSpecFactory(host, port, this.eventLoopGroup, this.environment));
063        }
064
065        /**
066         * Create an instance with a pre-configured TCP client.
067         * @param tcpClient the client to use
068         */
069        public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
070                this.tcpClient = tcpClient;
071                this.eventLoopGroup = null;
072                this.environment = null;
073        }
074
075
076        @Override
077        public void start() {
078                if (!isRunning()) {
079                        this.running = true;
080                }
081        }
082
083        @Override
084        public void stop() {
085                if (isRunning()) {
086                        this.running = false;
087                        try {
088                                if (this.eventLoopGroup != null) {
089                                        this.eventLoopGroup.shutdownGracefully().await(5000);
090                                }
091                                if (this.environment != null) {
092                                        this.environment.shutdown();
093                                }
094                        }
095                        catch (InterruptedException ex) {
096                                if (logger.isErrorEnabled()) {
097                                        logger.error("Failed to shutdown gracefully", ex);
098                                }
099                        }
100                }
101        }
102
103        @Override
104        public boolean isRunning() {
105                return this.running;
106        }
107
108
109        /**
110         * Connect and notify the given {@link StompSessionHandler} when connected
111         * on the STOMP level.
112         * @param handler the handler for the STOMP session
113         * @return ListenableFuture for access to the session when ready for use
114         */
115        public ListenableFuture<StompSession> connect(StompSessionHandler handler) {
116                return connect(null, handler);
117        }
118
119        /**
120         * An overloaded version of {@link #connect(StompSessionHandler)} that
121         * accepts headers to use for the STOMP CONNECT frame.
122         * @param connectHeaders headers to add to the CONNECT frame
123         * @param handler the handler for the STOMP session
124         * @return ListenableFuture for access to the session when ready for use
125         */
126        public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
127                ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
128                this.tcpClient.connect(session);
129                return session.getSessionFuture();
130        }
131
132        /**
133         * Shut down the client and release resources.
134         */
135        public void shutdown() {
136                this.tcpClient.shutdown();
137        }
138
139
140        private static class StompTcpClientSpecFactory implements TcpClientFactory<Message<byte[]>, Message<byte[]>> {
141
142                private final String host;
143
144                private final int port;
145
146                private final NettyClientSocketOptions socketOptions;
147
148                private final Environment environment;
149
150                private final Reactor2StompCodec codec;
151
152
153                StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) {
154                        this.host = host;
155                        this.port = port;
156                        this.socketOptions = new NettyClientSocketOptions().eventLoopGroup(group);
157                        this.environment = environment;
158                        this.codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
159                }
160
161                @Override
162                public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
163                                TcpClientSpec<Message<byte[]>, Message<byte[]>> clientSpec) {
164
165                        return clientSpec
166                                        .env(this.environment)
167                                        .dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE))
168                                        .connect(this.host, this.port)
169                                        .codec(this.codec)
170                                        .options(this.socketOptions);
171                }
172        }
173
174}