001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.util.Arrays;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023
024import org.springframework.messaging.converter.MessageConverter;
025import org.springframework.messaging.converter.SimpleMessageConverter;
026import org.springframework.scheduling.TaskScheduler;
027import org.springframework.util.Assert;
028
029/**
030 * Base class for STOMP client implementations.
031 *
032 * <p>Subclasses can connect over WebSocket or TCP using any library. When creating
033 * a new connection, a subclass can create an instance of @link DefaultStompSession}
034 * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler}
035 * whose lifecycle methods the subclass must then invoke.
036 *
037 * <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the
038 * contracts that any subclass must adapt to while using {@link StompEncoder}
039 * and {@link StompDecoder} to encode and decode STOMP messages.
040 *
041 * @author Rossen Stoyanchev
042 * @since 4.2
043 */
044public abstract class StompClientSupport {
045
046        protected Log logger = LogFactory.getLog(getClass());
047
048        private MessageConverter messageConverter = new SimpleMessageConverter();
049
050        private TaskScheduler taskScheduler;
051
052        private long[] defaultHeartbeat = new long[] {10000, 10000};
053
054        private long receiptTimeLimit = 15 * 1000;
055
056
057        /**
058         * Set the {@link MessageConverter} to use to convert the payload of incoming
059         * and outgoing messages to and from {@code byte[]} based on object type
060         * and the "content-type" header.
061         * <p>By default, {@link SimpleMessageConverter} is configured.
062         * @param messageConverter the message converter to use
063         */
064        public void setMessageConverter(MessageConverter messageConverter) {
065                Assert.notNull(messageConverter, "MessageConverter must not be null");
066                this.messageConverter = messageConverter;
067        }
068
069        /**
070         * Return the configured {@link MessageConverter}.
071         */
072        public MessageConverter getMessageConverter() {
073                return this.messageConverter;
074        }
075
076        /**
077         * Configure a scheduler to use for heartbeats and for receipt tracking.
078         * <p><strong>Note:</strong> Some transports have built-in support to work
079         * with heartbeats and therefore do not require a TaskScheduler.
080         * Receipts however, if needed, do require a TaskScheduler to be configured.
081         * <p>By default, this is not set.
082         */
083        public void setTaskScheduler(TaskScheduler taskScheduler) {
084                this.taskScheduler = taskScheduler;
085        }
086
087        /**
088         * The configured TaskScheduler.
089         */
090        public TaskScheduler getTaskScheduler() {
091                return this.taskScheduler;
092        }
093
094        /**
095         * Configure the default value for the "heart-beat" header of the STOMP
096         * CONNECT frame. The first number represents how often the client will write
097         * or send a heart-beat. The second is how often the server should write.
098         * A value of 0 means no heart-beats.
099         * <p>By default this is set to "10000,10000" but subclasses may override
100         * that default and for example set it to "0,0" if they require a
101         * TaskScheduler to be configured first.
102         * @param heartbeat the value for the CONNECT "heart-beat" header
103         * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#Heart-beating">
104         * https://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
105         */
106        public void setDefaultHeartbeat(long[] heartbeat) {
107                if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
108                        throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
109                }
110                this.defaultHeartbeat = heartbeat;
111        }
112
113        /**
114         * Return the configured default heart-beat value (never {@code null}).
115         */
116        public long[] getDefaultHeartbeat() {
117                return this.defaultHeartbeat;
118        }
119
120        /**
121         * Determine whether heartbeats are enabled.
122         * <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat}
123         * is set to "0,0", and {@code true} otherwise.
124         */
125        public boolean isDefaultHeartbeatEnabled() {
126                long[] heartbeat = getDefaultHeartbeat();
127                return (heartbeat[0] != 0 && heartbeat[1] != 0);
128        }
129
130        /**
131         * Configure the number of milliseconds before a receipt is considered expired.
132         * <p>By default set to 15,000 (15 seconds).
133         */
134        public void setReceiptTimeLimit(long receiptTimeLimit) {
135                Assert.isTrue(receiptTimeLimit > 0, "Receipt time limit must be larger than zero");
136                this.receiptTimeLimit = receiptTimeLimit;
137        }
138
139        /**
140         * Return the configured receipt time limit.
141         */
142        public long getReceiptTimeLimit() {
143                return this.receiptTimeLimit;
144        }
145
146
147        /**
148         * Factory method for create and configure a new session.
149         * @param connectHeaders headers for the STOMP CONNECT frame
150         * @param handler the handler for the STOMP session
151         * @return the created session
152         */
153        protected ConnectionHandlingStompSession createSession(StompHeaders connectHeaders, StompSessionHandler handler) {
154                connectHeaders = processConnectHeaders(connectHeaders);
155                DefaultStompSession session = new DefaultStompSession(handler, connectHeaders);
156                session.setMessageConverter(getMessageConverter());
157                session.setTaskScheduler(getTaskScheduler());
158                session.setReceiptTimeLimit(getReceiptTimeLimit());
159                return session;
160        }
161
162        /**
163         * Further initialize the StompHeaders, for example setting the heart-beat
164         * header if necessary.
165         * @param connectHeaders the headers to modify
166         * @return the modified headers
167         */
168        protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) {
169                connectHeaders = (connectHeaders != null ? connectHeaders : new StompHeaders());
170                if (connectHeaders.getHeartbeat() == null) {
171                        connectHeaders.setHeartbeat(getDefaultHeartbeat());
172                }
173                return connectHeaders;
174        }
175
176}