001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import io.netty.channel.EventLoopGroup; 020import reactor.Environment; 021import reactor.io.net.NetStreams.TcpClientFactory; 022import reactor.io.net.Spec.TcpClientSpec; 023import reactor.io.net.impl.netty.NettyClientSocketOptions; 024 025import org.springframework.context.Lifecycle; 026import org.springframework.messaging.Message; 027import org.springframework.messaging.tcp.TcpOperations; 028import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; 029import org.springframework.util.concurrent.ListenableFuture; 030 031/** 032 * A STOMP over TCP client that uses {@link Reactor2TcpClient}. 033 * 034 * @author Rossen Stoyanchev 035 * @since 4.2 036 */ 037public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle { 038 039 private final TcpOperations<byte[]> tcpClient; 040 041 private final EventLoopGroup eventLoopGroup; 042 043 private final Environment environment; 044 045 private volatile boolean running = false; 046 047 048 /** 049 * Create an instance with host "127.0.0.1" and port 61613. 050 */ 051 public Reactor2TcpStompClient() { 052 this("127.0.0.1", 61613); 053 } 054 055 /** 056 * Create an instance with the given host and port to connect to 057 */ 058 public Reactor2TcpStompClient(String host, int port) { 059 this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup(); 060 this.environment = new Environment(); 061 this.tcpClient = new Reactor2TcpClient<byte[]>( 062 new StompTcpClientSpecFactory(host, port, this.eventLoopGroup, this.environment)); 063 } 064 065 /** 066 * Create an instance with a pre-configured TCP client. 067 * @param tcpClient the client to use 068 */ 069 public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) { 070 this.tcpClient = tcpClient; 071 this.eventLoopGroup = null; 072 this.environment = null; 073 } 074 075 076 @Override 077 public void start() { 078 if (!isRunning()) { 079 this.running = true; 080 } 081 } 082 083 @Override 084 public void stop() { 085 if (isRunning()) { 086 this.running = false; 087 try { 088 if (this.eventLoopGroup != null) { 089 this.eventLoopGroup.shutdownGracefully().await(5000); 090 } 091 if (this.environment != null) { 092 this.environment.shutdown(); 093 } 094 } 095 catch (InterruptedException ex) { 096 if (logger.isErrorEnabled()) { 097 logger.error("Failed to shutdown gracefully", ex); 098 } 099 } 100 } 101 } 102 103 @Override 104 public boolean isRunning() { 105 return this.running; 106 } 107 108 109 /** 110 * Connect and notify the given {@link StompSessionHandler} when connected 111 * on the STOMP level. 112 * @param handler the handler for the STOMP session 113 * @return ListenableFuture for access to the session when ready for use 114 */ 115 public ListenableFuture<StompSession> connect(StompSessionHandler handler) { 116 return connect(null, handler); 117 } 118 119 /** 120 * An overloaded version of {@link #connect(StompSessionHandler)} that 121 * accepts headers to use for the STOMP CONNECT frame. 122 * @param connectHeaders headers to add to the CONNECT frame 123 * @param handler the handler for the STOMP session 124 * @return ListenableFuture for access to the session when ready for use 125 */ 126 public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) { 127 ConnectionHandlingStompSession session = createSession(connectHeaders, handler); 128 this.tcpClient.connect(session); 129 return session.getSessionFuture(); 130 } 131 132 /** 133 * Shut down the client and release resources. 134 */ 135 public void shutdown() { 136 this.tcpClient.shutdown(); 137 } 138 139 140 private static class StompTcpClientSpecFactory implements TcpClientFactory<Message<byte[]>, Message<byte[]>> { 141 142 private final String host; 143 144 private final int port; 145 146 private final NettyClientSocketOptions socketOptions; 147 148 private final Environment environment; 149 150 private final Reactor2StompCodec codec; 151 152 153 StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) { 154 this.host = host; 155 this.port = port; 156 this.socketOptions = new NettyClientSocketOptions().eventLoopGroup(group); 157 this.environment = environment; 158 this.codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder()); 159 } 160 161 @Override 162 public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply( 163 TcpClientSpec<Message<byte[]>, Message<byte[]>> clientSpec) { 164 165 return clientSpec 166 .env(this.environment) 167 .dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE)) 168 .connect(this.host, this.port) 169 .codec(this.codec) 170 .options(this.socketOptions); 171 } 172 } 173 174}