001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.util.Arrays; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023 024import org.springframework.messaging.converter.MessageConverter; 025import org.springframework.messaging.converter.SimpleMessageConverter; 026import org.springframework.scheduling.TaskScheduler; 027import org.springframework.util.Assert; 028 029/** 030 * Base class for STOMP client implementations. 031 * 032 * <p>Subclasses can connect over WebSocket or TCP using any library. When creating 033 * a new connection, a subclass can create an instance of @link DefaultStompSession} 034 * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler} 035 * whose lifecycle methods the subclass must then invoke. 036 * 037 * <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the 038 * contracts that any subclass must adapt to while using {@link StompEncoder} 039 * and {@link StompDecoder} to encode and decode STOMP messages. 040 * 041 * @author Rossen Stoyanchev 042 * @since 4.2 043 */ 044public abstract class StompClientSupport { 045 046 protected Log logger = LogFactory.getLog(getClass()); 047 048 private MessageConverter messageConverter = new SimpleMessageConverter(); 049 050 private TaskScheduler taskScheduler; 051 052 private long[] defaultHeartbeat = new long[] {10000, 10000}; 053 054 private long receiptTimeLimit = 15 * 1000; 055 056 057 /** 058 * Set the {@link MessageConverter} to use to convert the payload of incoming 059 * and outgoing messages to and from {@code byte[]} based on object type 060 * and the "content-type" header. 061 * <p>By default, {@link SimpleMessageConverter} is configured. 062 * @param messageConverter the message converter to use 063 */ 064 public void setMessageConverter(MessageConverter messageConverter) { 065 Assert.notNull(messageConverter, "MessageConverter must not be null"); 066 this.messageConverter = messageConverter; 067 } 068 069 /** 070 * Return the configured {@link MessageConverter}. 071 */ 072 public MessageConverter getMessageConverter() { 073 return this.messageConverter; 074 } 075 076 /** 077 * Configure a scheduler to use for heartbeats and for receipt tracking. 078 * <p><strong>Note:</strong> Some transports have built-in support to work 079 * with heartbeats and therefore do not require a TaskScheduler. 080 * Receipts however, if needed, do require a TaskScheduler to be configured. 081 * <p>By default, this is not set. 082 */ 083 public void setTaskScheduler(TaskScheduler taskScheduler) { 084 this.taskScheduler = taskScheduler; 085 } 086 087 /** 088 * The configured TaskScheduler. 089 */ 090 public TaskScheduler getTaskScheduler() { 091 return this.taskScheduler; 092 } 093 094 /** 095 * Configure the default value for the "heart-beat" header of the STOMP 096 * CONNECT frame. The first number represents how often the client will write 097 * or send a heart-beat. The second is how often the server should write. 098 * A value of 0 means no heart-beats. 099 * <p>By default this is set to "10000,10000" but subclasses may override 100 * that default and for example set it to "0,0" if they require a 101 * TaskScheduler to be configured first. 102 * @param heartbeat the value for the CONNECT "heart-beat" header 103 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#Heart-beating"> 104 * https://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a> 105 */ 106 public void setDefaultHeartbeat(long[] heartbeat) { 107 if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) { 108 throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat)); 109 } 110 this.defaultHeartbeat = heartbeat; 111 } 112 113 /** 114 * Return the configured default heart-beat value (never {@code null}). 115 */ 116 public long[] getDefaultHeartbeat() { 117 return this.defaultHeartbeat; 118 } 119 120 /** 121 * Determine whether heartbeats are enabled. 122 * <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat} 123 * is set to "0,0", and {@code true} otherwise. 124 */ 125 public boolean isDefaultHeartbeatEnabled() { 126 long[] heartbeat = getDefaultHeartbeat(); 127 return (heartbeat[0] != 0 && heartbeat[1] != 0); 128 } 129 130 /** 131 * Configure the number of milliseconds before a receipt is considered expired. 132 * <p>By default set to 15,000 (15 seconds). 133 */ 134 public void setReceiptTimeLimit(long receiptTimeLimit) { 135 Assert.isTrue(receiptTimeLimit > 0, "Receipt time limit must be larger than zero"); 136 this.receiptTimeLimit = receiptTimeLimit; 137 } 138 139 /** 140 * Return the configured receipt time limit. 141 */ 142 public long getReceiptTimeLimit() { 143 return this.receiptTimeLimit; 144 } 145 146 147 /** 148 * Factory method for create and configure a new session. 149 * @param connectHeaders headers for the STOMP CONNECT frame 150 * @param handler the handler for the STOMP session 151 * @return the created session 152 */ 153 protected ConnectionHandlingStompSession createSession(StompHeaders connectHeaders, StompSessionHandler handler) { 154 connectHeaders = processConnectHeaders(connectHeaders); 155 DefaultStompSession session = new DefaultStompSession(handler, connectHeaders); 156 session.setMessageConverter(getMessageConverter()); 157 session.setTaskScheduler(getTaskScheduler()); 158 session.setReceiptTimeLimit(getReceiptTimeLimit()); 159 return session; 160 } 161 162 /** 163 * Further initialize the StompHeaders, for example setting the heart-beat 164 * header if necessary. 165 * @param connectHeaders the headers to modify 166 * @return the modified headers 167 */ 168 protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) { 169 connectHeaders = (connectHeaders != null ? connectHeaders : new StompHeaders()); 170 if (connectHeaders.getHeartbeat() == null) { 171 connectHeaders.setHeartbeat(getDefaultHeartbeat()); 172 } 173 return connectHeaders; 174 } 175 176}