001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.util.Arrays;
020import java.util.concurrent.TimeUnit;
021
022import org.springframework.lang.Nullable;
023import org.springframework.messaging.converter.MessageConverter;
024import org.springframework.messaging.converter.SimpleMessageConverter;
025import org.springframework.scheduling.TaskScheduler;
026import org.springframework.util.Assert;
027
028/**
029 * Base class for STOMP client implementations.
030 *
031 * <p>Subclasses can connect over WebSocket or TCP using any library. When creating
032 * a new connection, a subclass can create an instance of @link DefaultStompSession}
033 * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler}
034 * whose lifecycle methods the subclass must then invoke.
035 *
036 * <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the
037 * contracts that any subclass must adapt to while using {@link StompEncoder}
038 * and {@link StompDecoder} to encode and decode STOMP messages.
039 *
040 * @author Rossen Stoyanchev
041 * @since 4.2
042 */
043public abstract class StompClientSupport {
044
045        private MessageConverter messageConverter = new SimpleMessageConverter();
046
047        @Nullable
048        private TaskScheduler taskScheduler;
049
050        private long[] defaultHeartbeat = new long[] {10000, 10000};
051
052        private long receiptTimeLimit = TimeUnit.SECONDS.toMillis(15);
053
054
055        /**
056         * Set the {@link MessageConverter} to use to convert the payload of incoming
057         * and outgoing messages to and from {@code byte[]} based on object type
058         * and the "content-type" header.
059         * <p>By default, {@link SimpleMessageConverter} is configured.
060         * @param messageConverter the message converter to use
061         */
062        public void setMessageConverter(MessageConverter messageConverter) {
063                Assert.notNull(messageConverter, "MessageConverter must not be null");
064                this.messageConverter = messageConverter;
065        }
066
067        /**
068         * Return the configured {@link MessageConverter}.
069         */
070        public MessageConverter getMessageConverter() {
071                return this.messageConverter;
072        }
073
074        /**
075         * Configure a scheduler to use for heartbeats and for receipt tracking.
076         * <p><strong>Note:</strong> Some transports have built-in support to work
077         * with heartbeats and therefore do not require a TaskScheduler.
078         * Receipts however, if needed, do require a TaskScheduler to be configured.
079         * <p>By default, this is not set.
080         */
081        public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
082                this.taskScheduler = taskScheduler;
083        }
084
085        /**
086         * The configured TaskScheduler.
087         */
088        @Nullable
089        public TaskScheduler getTaskScheduler() {
090                return this.taskScheduler;
091        }
092
093        /**
094         * Configure the default value for the "heart-beat" header of the STOMP
095         * CONNECT frame. The first number represents how often the client will write
096         * or send a heart-beat. The second is how often the server should write.
097         * A value of 0 means no heart-beats.
098         * <p>By default this is set to "10000,10000" but subclasses may override
099         * that default and for example set it to "0,0" if they require a
100         * TaskScheduler to be configured first.
101         * @param heartbeat the value for the CONNECT "heart-beat" header
102         * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#Heart-beating">
103         * https://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
104         */
105        public void setDefaultHeartbeat(long[] heartbeat) {
106                if (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
107                        throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
108                }
109                this.defaultHeartbeat = heartbeat;
110        }
111
112        /**
113         * Return the configured default heart-beat value (never {@code null}).
114         */
115        public long[] getDefaultHeartbeat() {
116                return this.defaultHeartbeat;
117        }
118
119        /**
120         * Determine whether heartbeats are enabled.
121         * <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat}
122         * is set to "0,0", and {@code true} otherwise.
123         */
124        public boolean isDefaultHeartbeatEnabled() {
125                long[] heartbeat = getDefaultHeartbeat();
126                return (heartbeat[0] != 0 && heartbeat[1] != 0);
127        }
128
129        /**
130         * Configure the number of milliseconds before a receipt is considered expired.
131         * <p>By default set to 15,000 (15 seconds).
132         */
133        public void setReceiptTimeLimit(long receiptTimeLimit) {
134                Assert.isTrue(receiptTimeLimit > 0, "Receipt time limit must be larger than zero");
135                this.receiptTimeLimit = receiptTimeLimit;
136        }
137
138        /**
139         * Return the configured receipt time limit.
140         */
141        public long getReceiptTimeLimit() {
142                return this.receiptTimeLimit;
143        }
144
145
146        /**
147         * Factory method for create and configure a new session.
148         * @param connectHeaders headers for the STOMP CONNECT frame
149         * @param handler the handler for the STOMP session
150         * @return the created session
151         */
152        protected ConnectionHandlingStompSession createSession(
153                        @Nullable StompHeaders connectHeaders, StompSessionHandler handler) {
154
155                connectHeaders = processConnectHeaders(connectHeaders);
156                DefaultStompSession session = new DefaultStompSession(handler, connectHeaders);
157                session.setMessageConverter(getMessageConverter());
158                session.setTaskScheduler(getTaskScheduler());
159                session.setReceiptTimeLimit(getReceiptTimeLimit());
160                return session;
161        }
162
163        /**
164         * Further initialize the StompHeaders, for example setting the heart-beat
165         * header if necessary.
166         * @param connectHeaders the headers to modify
167         * @return the modified headers
168         */
169        protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeaders) {
170                connectHeaders = (connectHeaders != null ? connectHeaders : new StompHeaders());
171                if (connectHeaders.getHeartbeat() == null) {
172                        connectHeaders.setHeartbeat(getDefaultHeartbeat());
173                }
174                return connectHeaders;
175        }
176
177}