001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.util.Arrays; 020import java.util.concurrent.TimeUnit; 021 022import org.springframework.lang.Nullable; 023import org.springframework.messaging.converter.MessageConverter; 024import org.springframework.messaging.converter.SimpleMessageConverter; 025import org.springframework.scheduling.TaskScheduler; 026import org.springframework.util.Assert; 027 028/** 029 * Base class for STOMP client implementations. 030 * 031 * <p>Subclasses can connect over WebSocket or TCP using any library. When creating 032 * a new connection, a subclass can create an instance of @link DefaultStompSession} 033 * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler} 034 * whose lifecycle methods the subclass must then invoke. 035 * 036 * <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the 037 * contracts that any subclass must adapt to while using {@link StompEncoder} 038 * and {@link StompDecoder} to encode and decode STOMP messages. 039 * 040 * @author Rossen Stoyanchev 041 * @since 4.2 042 */ 043public abstract class StompClientSupport { 044 045 private MessageConverter messageConverter = new SimpleMessageConverter(); 046 047 @Nullable 048 private TaskScheduler taskScheduler; 049 050 private long[] defaultHeartbeat = new long[] {10000, 10000}; 051 052 private long receiptTimeLimit = TimeUnit.SECONDS.toMillis(15); 053 054 055 /** 056 * Set the {@link MessageConverter} to use to convert the payload of incoming 057 * and outgoing messages to and from {@code byte[]} based on object type 058 * and the "content-type" header. 059 * <p>By default, {@link SimpleMessageConverter} is configured. 060 * @param messageConverter the message converter to use 061 */ 062 public void setMessageConverter(MessageConverter messageConverter) { 063 Assert.notNull(messageConverter, "MessageConverter must not be null"); 064 this.messageConverter = messageConverter; 065 } 066 067 /** 068 * Return the configured {@link MessageConverter}. 069 */ 070 public MessageConverter getMessageConverter() { 071 return this.messageConverter; 072 } 073 074 /** 075 * Configure a scheduler to use for heartbeats and for receipt tracking. 076 * <p><strong>Note:</strong> Some transports have built-in support to work 077 * with heartbeats and therefore do not require a TaskScheduler. 078 * Receipts however, if needed, do require a TaskScheduler to be configured. 079 * <p>By default, this is not set. 080 */ 081 public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) { 082 this.taskScheduler = taskScheduler; 083 } 084 085 /** 086 * The configured TaskScheduler. 087 */ 088 @Nullable 089 public TaskScheduler getTaskScheduler() { 090 return this.taskScheduler; 091 } 092 093 /** 094 * Configure the default value for the "heart-beat" header of the STOMP 095 * CONNECT frame. The first number represents how often the client will write 096 * or send a heart-beat. The second is how often the server should write. 097 * A value of 0 means no heart-beats. 098 * <p>By default this is set to "10000,10000" but subclasses may override 099 * that default and for example set it to "0,0" if they require a 100 * TaskScheduler to be configured first. 101 * @param heartbeat the value for the CONNECT "heart-beat" header 102 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#Heart-beating"> 103 * https://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a> 104 */ 105 public void setDefaultHeartbeat(long[] heartbeat) { 106 if (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) { 107 throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat)); 108 } 109 this.defaultHeartbeat = heartbeat; 110 } 111 112 /** 113 * Return the configured default heart-beat value (never {@code null}). 114 */ 115 public long[] getDefaultHeartbeat() { 116 return this.defaultHeartbeat; 117 } 118 119 /** 120 * Determine whether heartbeats are enabled. 121 * <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat} 122 * is set to "0,0", and {@code true} otherwise. 123 */ 124 public boolean isDefaultHeartbeatEnabled() { 125 long[] heartbeat = getDefaultHeartbeat(); 126 return (heartbeat[0] != 0 && heartbeat[1] != 0); 127 } 128 129 /** 130 * Configure the number of milliseconds before a receipt is considered expired. 131 * <p>By default set to 15,000 (15 seconds). 132 */ 133 public void setReceiptTimeLimit(long receiptTimeLimit) { 134 Assert.isTrue(receiptTimeLimit > 0, "Receipt time limit must be larger than zero"); 135 this.receiptTimeLimit = receiptTimeLimit; 136 } 137 138 /** 139 * Return the configured receipt time limit. 140 */ 141 public long getReceiptTimeLimit() { 142 return this.receiptTimeLimit; 143 } 144 145 146 /** 147 * Factory method for create and configure a new session. 148 * @param connectHeaders headers for the STOMP CONNECT frame 149 * @param handler the handler for the STOMP session 150 * @return the created session 151 */ 152 protected ConnectionHandlingStompSession createSession( 153 @Nullable StompHeaders connectHeaders, StompSessionHandler handler) { 154 155 connectHeaders = processConnectHeaders(connectHeaders); 156 DefaultStompSession session = new DefaultStompSession(handler, connectHeaders); 157 session.setMessageConverter(getMessageConverter()); 158 session.setTaskScheduler(getTaskScheduler()); 159 session.setReceiptTimeLimit(getReceiptTimeLimit()); 160 return session; 161 } 162 163 /** 164 * Further initialize the StompHeaders, for example setting the heart-beat 165 * header if necessary. 166 * @param connectHeaders the headers to modify 167 * @return the modified headers 168 */ 169 protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeaders) { 170 connectHeaders = (connectHeaders != null ? connectHeaders : new StompHeaders()); 171 if (connectHeaders.getHeartbeat() == null) { 172 connectHeaders.setHeartbeat(getDefaultHeartbeat()); 173 } 174 return connectHeaders; 175 } 176 177}