001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import org.springframework.lang.Nullable; 020import org.springframework.messaging.simp.SimpLogging; 021import org.springframework.messaging.tcp.TcpOperations; 022import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; 023import org.springframework.util.Assert; 024import org.springframework.util.concurrent.ListenableFuture; 025 026/** 027 * A STOMP over TCP client that uses {@link ReactorNettyTcpClient}. 028 * 029 * @author Rossen Stoyanchev 030 * @since 5.0 031 */ 032public class ReactorNettyTcpStompClient extends StompClientSupport { 033 034 private final TcpOperations<byte[]> tcpClient; 035 036 037 /** 038 * Create an instance with host "127.0.0.1" and port 61613. 039 */ 040 public ReactorNettyTcpStompClient() { 041 this("127.0.0.1", 61613); 042 } 043 044 /** 045 * Create an instance with the given host and port. 046 * @param host the host 047 * @param port the port 048 */ 049 public ReactorNettyTcpStompClient(String host, int port) { 050 this.tcpClient = initTcpClient(host, port); 051 } 052 053 /** 054 * Create an instance with a pre-configured TCP client. 055 * @param tcpClient the client to use 056 */ 057 public ReactorNettyTcpStompClient(TcpOperations<byte[]> tcpClient) { 058 Assert.notNull(tcpClient, "'tcpClient' is required"); 059 this.tcpClient = tcpClient; 060 } 061 062 private static ReactorNettyTcpClient<byte[]> initTcpClient(String host, int port) { 063 ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(host, port, new StompReactorNettyCodec()); 064 client.setLogger(SimpLogging.forLog(client.getLogger())); 065 return client; 066 } 067 068 069 /** 070 * Connect and notify the given {@link StompSessionHandler} when connected 071 * on the STOMP level. 072 * @param handler the handler for the STOMP session 073 * @return a ListenableFuture for access to the session when ready for use 074 */ 075 public ListenableFuture<StompSession> connect(StompSessionHandler handler) { 076 return connect(null, handler); 077 } 078 079 /** 080 * An overloaded version of {@link #connect(StompSessionHandler)} that 081 * accepts headers to use for the STOMP CONNECT frame. 082 * @param connectHeaders headers to add to the CONNECT frame 083 * @param handler the handler for the STOMP session 084 * @return a ListenableFuture for access to the session when ready for use 085 */ 086 public ListenableFuture<StompSession> connect(@Nullable StompHeaders connectHeaders, StompSessionHandler handler) { 087 ConnectionHandlingStompSession session = createSession(connectHeaders, handler); 088 this.tcpClient.connect(session); 089 return session.getSessionFuture(); 090 } 091 092 /** 093 * Shut down the client and release resources. 094 */ 095 public void shutdown() { 096 this.tcpClient.shutdown(); 097 } 098 099 @Override 100 public String toString() { 101 return "ReactorNettyTcpStompClient[" + this.tcpClient + "]"; 102 } 103}