001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.security.Principal;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.springframework.lang.Nullable;
029import org.springframework.messaging.Message;
030import org.springframework.messaging.MessageChannel;
031import org.springframework.messaging.MessageDeliveryException;
032import org.springframework.messaging.MessageHandler;
033import org.springframework.messaging.MessageHeaders;
034import org.springframework.messaging.SubscribableChannel;
035import org.springframework.messaging.simp.SimpLogging;
036import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
037import org.springframework.messaging.simp.SimpMessageType;
038import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
039import org.springframework.messaging.support.MessageBuilder;
040import org.springframework.messaging.support.MessageHeaderAccessor;
041import org.springframework.messaging.support.MessageHeaderInitializer;
042import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
043import org.springframework.messaging.tcp.TcpConnection;
044import org.springframework.messaging.tcp.TcpConnectionHandler;
045import org.springframework.messaging.tcp.TcpOperations;
046import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
047import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
048import org.springframework.util.Assert;
049import org.springframework.util.concurrent.ListenableFuture;
050import org.springframework.util.concurrent.ListenableFutureCallback;
051import org.springframework.util.concurrent.ListenableFutureTask;
052
053/**
054 * A {@link org.springframework.messaging.MessageHandler} that handles messages by
055 * forwarding them to a STOMP broker.
056 *
057 * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
058 * connection to the broker is opened and used exclusively for all messages from the
059 * client that originated the CONNECT message. Messages from the same client are
060 * identified through the session id message header. Reversely, when the STOMP broker
061 * sends messages back on the TCP connection, those messages are enriched with the
062 * session id of the client and sent back downstream through the {@link MessageChannel}
063 * provided to the constructor.
064 *
065 * <p>This class also automatically opens a default "system" TCP connection to the
066 * message broker that is used for sending messages that originate from the server
067 * application (as opposed to from a client). Such messages are not associated with
068 * any client and therefore do not have a session id header. The "system" connection
069 * is effectively shared and cannot be used to receive messages. Several properties
070 * are provided to configure the "system" connection including:
071 * <ul>
072 * <li>{@link #setSystemLogin}</li>
073 * <li>{@link #setSystemPasscode}</li>
074 * <li>{@link #setSystemHeartbeatSendInterval}</li>
075 * <li>{@link #setSystemHeartbeatReceiveInterval}</li>
076 * </ul>
077 *
078 * @author Rossen Stoyanchev
079 * @author Andy Wilkinson
080 * @since 4.0
081 */
082public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
083
084        /**
085         * The system session ID.
086         */
087        public static final String SYSTEM_SESSION_ID = "_system_";
088
089        /** STOMP recommended error of margin for receiving heartbeats. */
090        private static final long HEARTBEAT_MULTIPLIER = 3;
091
092        /**
093         * Heartbeat starts once CONNECTED frame with heartbeat settings is received.
094         * If CONNECTED doesn't arrive within a minute, we'll close the connection.
095         */
096        private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
097
098        private static final byte[] EMPTY_PAYLOAD = new byte[0];
099
100        private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
101
102        private static final Message<byte[]> HEARTBEAT_MESSAGE;
103
104
105        static {
106                EMPTY_TASK.run();
107                StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
108                HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
109        }
110
111
112        private String relayHost = "127.0.0.1";
113
114        private int relayPort = 61613;
115
116        private String clientLogin = "guest";
117
118        private String clientPasscode = "guest";
119
120        private String systemLogin = "guest";
121
122        private String systemPasscode = "guest";
123
124        private long systemHeartbeatSendInterval = 10000;
125
126        private long systemHeartbeatReceiveInterval = 10000;
127
128        private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4);
129
130        @Nullable
131        private String virtualHost;
132
133        @Nullable
134        private TcpOperations<byte[]> tcpClient;
135
136        @Nullable
137        private MessageHeaderInitializer headerInitializer;
138
139        private final DefaultStats stats = new DefaultStats();
140
141        private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
142
143
144        /**
145         * Create a StompBrokerRelayMessageHandler instance with the given message channels
146         * and destination prefixes.
147         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
148         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
149         * @param brokerChannel the channel for the application to send messages to the broker
150         * @param destinationPrefixes the broker supported destination prefixes; destinations
151         * that do not match the given prefix are ignored.
152         */
153        public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
154                        SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
155
156                super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes);
157        }
158
159
160        /**
161         * Set the STOMP message broker host.
162         */
163        public void setRelayHost(String relayHost) {
164                Assert.hasText(relayHost, "relayHost must not be empty");
165                this.relayHost = relayHost;
166        }
167
168        /**
169         * Return the STOMP message broker host.
170         */
171        public String getRelayHost() {
172                return this.relayHost;
173        }
174
175        /**
176         * Set the STOMP message broker port.
177         */
178        public void setRelayPort(int relayPort) {
179                this.relayPort = relayPort;
180        }
181
182        /**
183         * Return the STOMP message broker port.
184         */
185        public int getRelayPort() {
186                return this.relayPort;
187        }
188        /**
189         * Set the login to use when creating connections to the STOMP broker on
190         * behalf of connected clients.
191         * <p>By default this is set to "guest".
192         * @see #setSystemLogin(String)
193         */
194        public void setClientLogin(String clientLogin) {
195                Assert.hasText(clientLogin, "clientLogin must not be empty");
196                this.clientLogin = clientLogin;
197        }
198
199        /**
200         * Return the configured login to use for connections to the STOMP broker
201         * on behalf of connected clients.
202         * @see #getSystemLogin()
203         */
204        public String getClientLogin() {
205                return this.clientLogin;
206        }
207
208        /**
209         * Set the client passcode to use to create connections to the STOMP broker on
210         * behalf of connected clients.
211         * <p>By default this is set to "guest".
212         * @see #setSystemPasscode
213         */
214        public void setClientPasscode(String clientPasscode) {
215                Assert.hasText(clientPasscode, "clientPasscode must not be empty");
216                this.clientPasscode = clientPasscode;
217        }
218
219        /**
220         * Return the configured passcode to use for connections to the STOMP broker on
221         * behalf of connected clients.
222         * @see #getSystemPasscode()
223         */
224        public String getClientPasscode() {
225                return this.clientPasscode;
226        }
227
228        /**
229         * Set the login for the shared "system" connection used to send messages to
230         * the STOMP broker from within the application, i.e. messages not associated
231         * with a specific client session (e.g. REST/HTTP request handling method).
232         * <p>By default this is set to "guest".
233         */
234        public void setSystemLogin(String systemLogin) {
235                Assert.hasText(systemLogin, "systemLogin must not be empty");
236                this.systemLogin = systemLogin;
237        }
238
239        /**
240         * Return the login used for the shared "system" connection to the STOMP broker.
241         */
242        public String getSystemLogin() {
243                return this.systemLogin;
244        }
245
246        /**
247         * Set the passcode for the shared "system" connection used to send messages to
248         * the STOMP broker from within the application, i.e. messages not associated
249         * with a specific client session (e.g. REST/HTTP request handling method).
250         * <p>By default this is set to "guest".
251         */
252        public void setSystemPasscode(String systemPasscode) {
253                this.systemPasscode = systemPasscode;
254        }
255
256        /**
257         * Return the passcode used for the shared "system" connection to the STOMP broker.
258         */
259        public String getSystemPasscode() {
260                return this.systemPasscode;
261        }
262
263
264        /**
265         * Set the interval, in milliseconds, at which the "system" connection will, in the
266         * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
267         * of zero will prevent heartbeats from being sent to the broker.
268         * <p>The default value is 10000.
269         * <p>See class-level documentation for more information on the "system" connection.
270         */
271        public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
272                this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
273        }
274
275        /**
276         * Return the interval, in milliseconds, at which the "system" connection will
277         * send heartbeats to the STOMP broker.
278         */
279        public long getSystemHeartbeatSendInterval() {
280                return this.systemHeartbeatSendInterval;
281        }
282
283        /**
284         * Set the maximum interval, in milliseconds, at which the "system" connection
285         * expects, in the absence of any other data, to receive a heartbeat from the STOMP
286         * broker. A value of zero will configure the connection to expect not to receive
287         * heartbeats from the broker.
288         * <p>The default value is 10000.
289         * <p>See class-level documentation for more information on the "system" connection.
290         */
291        public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
292                this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
293        }
294
295        /**
296         * Return the interval, in milliseconds, at which the "system" connection expects
297         * to receive heartbeats from the STOMP broker.
298         */
299        public long getSystemHeartbeatReceiveInterval() {
300                return this.systemHeartbeatReceiveInterval;
301        }
302
303        /**
304         * Configure one more destinations to subscribe to on the shared "system"
305         * connection along with MessageHandler's to handle received messages.
306         * <p>This is for internal use in a multi-application server scenario where
307         * servers forward messages to each other (e.g. unresolved user destinations).
308         * @param subscriptions the destinations to subscribe to.
309         */
310        public void setSystemSubscriptions(@Nullable Map<String, MessageHandler> subscriptions) {
311                this.systemSubscriptions.clear();
312                if (subscriptions != null) {
313                        this.systemSubscriptions.putAll(subscriptions);
314                }
315        }
316
317        /**
318         * Return the configured map with subscriptions on the "system" connection.
319         */
320        public Map<String, MessageHandler> getSystemSubscriptions() {
321                return this.systemSubscriptions;
322        }
323
324        /**
325         * Set the value of the "host" header to use in STOMP CONNECT frames. When this
326         * property is configured, a "host" header will be added to every STOMP frame sent to
327         * the STOMP broker. This may be useful for example in a cloud environment where the
328         * actual host to which the TCP connection is established is different from the host
329         * providing the cloud-based STOMP service.
330         * <p>By default this property is not set.
331         */
332        public void setVirtualHost(@Nullable String virtualHost) {
333                this.virtualHost = virtualHost;
334        }
335
336        /**
337         * Return the configured virtual host value.
338         */
339        @Nullable
340        public String getVirtualHost() {
341                return this.virtualHost;
342        }
343
344        /**
345         * Configure a TCP client for managing TCP connections to the STOMP broker.
346         * <p>By default {@link ReactorNettyTcpClient} is used.
347         * <p><strong>Note:</strong> when this property is used, any
348         * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
349         * specified are effectively ignored.
350         */
351        public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient) {
352                this.tcpClient = tcpClient;
353        }
354
355        /**
356         * Get the configured TCP client (never {@code null} unless not configured
357         * invoked and this method is invoked before the handler is started and
358         * hence a default implementation initialized).
359         */
360        @Nullable
361        public TcpOperations<byte[]> getTcpClient() {
362                return this.tcpClient;
363        }
364
365        /**
366         * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
367         * messages created through the {@code StompBrokerRelayMessageHandler} that
368         * are sent to the client outbound message channel.
369         * <p>By default this property is not set.
370         */
371        public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) {
372                this.headerInitializer = headerInitializer;
373        }
374
375        /**
376         * Return the configured header initializer.
377         */
378        @Nullable
379        public MessageHeaderInitializer getHeaderInitializer() {
380                return this.headerInitializer;
381        }
382
383        /**
384         * Return a String describing internal state and counters.
385         * Effectively {@code toString()} on {@link #getStats() getStats()}.
386         */
387        public String getStatsInfo() {
388                return this.stats.toString();
389        }
390
391        /**
392         * Return a structured object with internal state and counters.
393         * @since 5.2
394         */
395        public Stats getStats() {
396                return this.stats;
397        }
398
399
400        /**
401         * Return the current count of TCP connection to the broker.
402         */
403        public int getConnectionCount() {
404                return this.connectionHandlers.size();
405        }
406
407
408        @Override
409        protected void startInternal() {
410                if (this.tcpClient == null) {
411                        this.tcpClient = initTcpClient();
412                }
413
414                if (logger.isInfoEnabled()) {
415                        logger.info("Starting \"system\" session, " + toString());
416                }
417
418                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
419                accessor.setAcceptVersion("1.1,1.2");
420                accessor.setLogin(this.systemLogin);
421                accessor.setPasscode(this.systemPasscode);
422                accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
423                String virtualHost = getVirtualHost();
424                if (virtualHost != null) {
425                        accessor.setHost(virtualHost);
426                }
427                accessor.setSessionId(SYSTEM_SESSION_ID);
428                if (logger.isDebugEnabled()) {
429                        logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
430                }
431
432                SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
433                this.connectionHandlers.put(handler.getSessionId(), handler);
434
435                this.stats.incrementConnectCount();
436                this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
437        }
438
439        private ReactorNettyTcpClient<byte[]> initTcpClient() {
440                StompDecoder decoder = new StompDecoder();
441                if (this.headerInitializer != null) {
442                        decoder.setHeaderInitializer(this.headerInitializer);
443                }
444                ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder);
445                ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
446                client.setLogger(SimpLogging.forLog(client.getLogger()));
447                return client;
448        }
449
450        @Override
451        protected void stopInternal() {
452                publishBrokerUnavailableEvent();
453                if (this.tcpClient != null) {
454                        try {
455                                this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
456                        }
457                        catch (Throwable ex) {
458                                logger.error("Error in shutdown of TCP client", ex);
459                        }
460                }
461        }
462
463        @Override
464        protected void handleMessageInternal(Message<?> message) {
465                String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
466
467                if (!isBrokerAvailable()) {
468                        if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
469                                throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
470                                                "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
471                        }
472                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
473                        if (handler != null) {
474                                handler.sendStompErrorFrameToClient("Broker not available.");
475                                handler.clearConnection();
476                        }
477                        else {
478                                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
479                                if (getHeaderInitializer() != null) {
480                                        getHeaderInitializer().initHeaders(accessor);
481                                }
482                                accessor.setSessionId(sessionId);
483                                Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
484                                if (user != null) {
485                                        accessor.setUser(user);
486                                }
487                                accessor.setMessage("Broker not available.");
488                                MessageHeaders headers = accessor.getMessageHeaders();
489                                getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
490                        }
491                        return;
492                }
493
494                StompHeaderAccessor stompAccessor;
495                StompCommand command;
496
497                MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
498                if (accessor == null) {
499                        throw new IllegalStateException(
500                                        "No header accessor (not using the SimpMessagingTemplate?): " + message);
501                }
502                else if (accessor instanceof StompHeaderAccessor) {
503                        stompAccessor = (StompHeaderAccessor) accessor;
504                        command = stompAccessor.getCommand();
505                }
506                else if (accessor instanceof SimpMessageHeaderAccessor) {
507                        stompAccessor = StompHeaderAccessor.wrap(message);
508                        command = stompAccessor.getCommand();
509                        if (command == null) {
510                                command = stompAccessor.updateStompCommandAsClientMessage();
511                        }
512                }
513                else {
514                        throw new IllegalStateException(
515                                        "Unexpected header accessor type " + accessor.getClass() + " in " + message);
516                }
517
518                if (sessionId == null) {
519                        if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
520                                if (logger.isErrorEnabled()) {
521                                        logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
522                                }
523                                return;
524                        }
525                        sessionId = SYSTEM_SESSION_ID;
526                        stompAccessor.setSessionId(sessionId);
527                }
528
529                String destination = stompAccessor.getDestination();
530                if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
531                        return;
532                }
533
534                if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
535                        if (logger.isDebugEnabled()) {
536                                logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
537                        }
538                        stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
539                        stompAccessor.setLogin(this.clientLogin);
540                        stompAccessor.setPasscode(this.clientPasscode);
541                        if (getVirtualHost() != null) {
542                                stompAccessor.setHost(getVirtualHost());
543                        }
544                        StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
545                        this.connectionHandlers.put(sessionId, handler);
546                        this.stats.incrementConnectCount();
547                        Assert.state(this.tcpClient != null, "No TCP client available");
548                        this.tcpClient.connect(handler);
549                }
550                else if (StompCommand.DISCONNECT.equals(command)) {
551                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
552                        if (handler == null) {
553                                if (logger.isDebugEnabled()) {
554                                        logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
555                                }
556                                return;
557                        }
558                        this.stats.incrementDisconnectCount();
559                        handler.forward(message, stompAccessor);
560                }
561                else {
562                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
563                        if (handler == null) {
564                                if (logger.isDebugEnabled()) {
565                                        logger.debug("No TCP connection for session " + sessionId + " in " + message);
566                                }
567                                return;
568                        }
569                        handler.forward(message, stompAccessor);
570                }
571        }
572
573        @Override
574        public String toString() {
575                return "StompBrokerRelay[" + getTcpClientInfo() + "]";
576        }
577
578        private String getTcpClientInfo() {
579                return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort;
580        }
581
582
583        private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
584
585                private final String sessionId;
586
587                private final boolean isRemoteClientSession;
588
589                private final StompHeaderAccessor connectHeaders;
590
591                private final MessageChannel outboundChannel;
592
593                @Nullable
594                private volatile TcpConnection<byte[]> tcpConnection;
595
596                private volatile boolean isStompConnected;
597
598
599                protected StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
600                        this(sessionId, connectHeaders, true);
601                }
602
603                private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
604                        Assert.notNull(sessionId, "'sessionId' must not be null");
605                        Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
606                        this.sessionId = sessionId;
607                        this.connectHeaders = connectHeaders;
608                        this.isRemoteClientSession = isClientSession;
609                        this.outboundChannel = getClientOutboundChannelForSession(sessionId);
610                }
611
612                public String getSessionId() {
613                        return this.sessionId;
614                }
615
616                @Nullable
617                protected TcpConnection<byte[]> getTcpConnection() {
618                        return this.tcpConnection;
619                }
620
621                @Override
622                public void afterConnected(TcpConnection<byte[]> connection) {
623                        if (logger.isDebugEnabled()) {
624                                logger.debug("TCP connection opened in session=" + getSessionId());
625                        }
626                        this.tcpConnection = connection;
627                        connection.onReadInactivity(() -> {
628                                if (this.tcpConnection != null && !this.isStompConnected) {
629                                        handleTcpConnectionFailure("No CONNECTED frame received in " +
630                                                        MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
631                                }
632                        }, MAX_TIME_TO_CONNECTED_FRAME);
633                        connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
634                }
635
636                @Override
637                public void afterConnectFailure(Throwable ex) {
638                        handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex);
639                }
640
641                /**
642                 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish
643                 * the TCP connection, failure to send a message, missed heartbeat, etc.
644                 */
645                protected void handleTcpConnectionFailure(String error, @Nullable Throwable ex) {
646                        if (logger.isInfoEnabled()) {
647                                logger.info("TCP connection failure in session " + this.sessionId + ": " + error, ex);
648                        }
649                        try {
650                                sendStompErrorFrameToClient(error);
651                        }
652                        finally {
653                                try {
654                                        clearConnection();
655                                }
656                                catch (Throwable ex2) {
657                                        if (logger.isDebugEnabled()) {
658                                                logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2);
659                                        }
660                                }
661                        }
662                }
663
664                private void sendStompErrorFrameToClient(String errorText) {
665                        if (this.isRemoteClientSession) {
666                                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
667                                if (getHeaderInitializer() != null) {
668                                        getHeaderInitializer().initHeaders(accessor);
669                                }
670                                accessor.setSessionId(this.sessionId);
671                                Principal user = this.connectHeaders.getUser();
672                                if (user != null) {
673                                        accessor.setUser(user);
674                                }
675                                accessor.setMessage(errorText);
676                                accessor.setLeaveMutable(true);
677                                Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
678                                handleInboundMessage(errorMessage);
679                        }
680                }
681
682                protected void handleInboundMessage(Message<?> message) {
683                        if (this.isRemoteClientSession) {
684                                this.outboundChannel.send(message);
685                        }
686                }
687
688                @Override
689                public void handleMessage(Message<byte[]> message) {
690                        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
691                        Assert.state(accessor != null, "No StompHeaderAccessor");
692                        accessor.setSessionId(this.sessionId);
693                        Principal user = this.connectHeaders.getUser();
694                        if (user != null) {
695                                accessor.setUser(user);
696                        }
697
698                        StompCommand command = accessor.getCommand();
699                        if (StompCommand.CONNECTED.equals(command)) {
700                                if (logger.isDebugEnabled()) {
701                                        logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
702                                }
703                                afterStompConnected(accessor);
704                        }
705                        else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
706                                logger.error("Received " + accessor.getShortLogMessage(message.getPayload()));
707                        }
708                        else if (logger.isTraceEnabled()) {
709                                logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
710                        }
711
712                        handleInboundMessage(message);
713                }
714
715                /**
716                 * Invoked after the STOMP CONNECTED frame is received. At this point the
717                 * connection is ready for sending STOMP messages to the broker.
718                 */
719                protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
720                        this.isStompConnected = true;
721                        stats.incrementConnectedCount();
722                        initHeartbeats(connectedHeaders);
723                }
724
725                private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
726                        if (this.isRemoteClientSession) {
727                                return;
728                        }
729
730                        TcpConnection<byte[]> con = this.tcpConnection;
731                        Assert.state(con != null, "No TcpConnection available");
732
733                        long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
734                        long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
735                        long serverSendInterval = connectedHeaders.getHeartbeat()[0];
736                        long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
737
738                        if (clientSendInterval > 0 && serverReceiveInterval > 0) {
739                                long interval = Math.max(clientSendInterval, serverReceiveInterval);
740                                con.onWriteInactivity(() ->
741                                                con.send(HEARTBEAT_MESSAGE).addCallback(
742                                                                result -> {},
743                                                                ex -> handleTcpConnectionFailure(
744                                                                                "Failed to forward heartbeat: " + ex.getMessage(), ex)), interval);
745                        }
746                        if (clientReceiveInterval > 0 && serverSendInterval > 0) {
747                                final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
748                                con.onReadInactivity(
749                                                () -> handleTcpConnectionFailure("No messages received in " + interval + " ms.", null), interval);
750                        }
751                }
752
753                @Override
754                public void handleFailure(Throwable ex) {
755                        if (this.tcpConnection != null) {
756                                handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex);
757                        }
758                        else if (logger.isErrorEnabled()) {
759                                logger.error("Transport failure: " + ex);
760                        }
761                }
762
763                @Override
764                public void afterConnectionClosed() {
765                        if (this.tcpConnection == null) {
766                                return;
767                        }
768                        try {
769                                if (logger.isDebugEnabled()) {
770                                        logger.debug("TCP connection to broker closed in session " + this.sessionId);
771                                }
772                                sendStompErrorFrameToClient("Connection to broker closed.");
773                        }
774                        finally {
775                                try {
776                                        // Prevent clearConnection() from trying to close
777                                        this.tcpConnection = null;
778                                        clearConnection();
779                                }
780                                catch (Throwable ex) {
781                                        // Shouldn't happen with connection reset beforehand
782                                }
783                        }
784                }
785
786                /**
787                 * Forward the given message to the STOMP broker.
788                 * <p>The method checks whether we have an active TCP connection and have
789                 * received the STOMP CONNECTED frame. For client messages this should be
790                 * false only if we lose the TCP connection around the same time when a
791                 * client message is being forwarded, so we simply log the ignored message
792                 * at debug level. For messages from within the application being sent on
793                 * the "system" connection an exception is raised so that components sending
794                 * the message have a chance to handle it -- by default the broker message
795                 * channel is synchronous.
796                 * <p>Note that if messages arrive concurrently around the same time a TCP
797                 * connection is lost, there is a brief period of time before the connection
798                 * is reset when one or more messages may sneak through and an attempt made
799                 * to forward them. Rather than synchronizing to guard against that, this
800                 * method simply lets them try and fail. For client sessions that may
801                 * result in an additional STOMP ERROR frame(s) being sent downstream but
802                 * code handling that downstream should be idempotent in such cases.
803                 * @param message the message to send (never {@code null})
804                 * @return a future to wait for the result
805                 */
806                @SuppressWarnings("unchecked")
807                public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
808                        TcpConnection<byte[]> conn = this.tcpConnection;
809
810                        if (!this.isStompConnected || conn == null) {
811                                if (this.isRemoteClientSession) {
812                                        if (logger.isDebugEnabled()) {
813                                                logger.debug("TCP connection closed already, ignoring " +
814                                                                accessor.getShortLogMessage(message.getPayload()));
815                                        }
816                                        return EMPTY_TASK;
817                                }
818                                else {
819                                        throw new IllegalStateException("Cannot forward messages " +
820                                                        (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
821                                                        "Consider subscribing to receive BrokerAvailabilityEvent's from " +
822                                                        "an ApplicationListener Spring bean. Dropped " +
823                                                        accessor.getShortLogMessage(message.getPayload()));
824                                }
825                        }
826
827                        final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
828                                        MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
829
830                        StompCommand command = accessor.getCommand();
831                        if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
832                                        StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
833                                logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
834                        }
835                        else if (logger.isTraceEnabled()) {
836                                logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
837                        }
838
839                        ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
840                        future.addCallback(new ListenableFutureCallback<Void>() {
841                                @Override
842                                public void onSuccess(@Nullable Void result) {
843                                        if (accessor.getCommand() == StompCommand.DISCONNECT) {
844                                                afterDisconnectSent(accessor);
845                                        }
846                                }
847                                @Override
848                                public void onFailure(Throwable ex) {
849                                        if (tcpConnection != null) {
850                                                handleTcpConnectionFailure("failed to forward " +
851                                                                accessor.getShortLogMessage(message.getPayload()), ex);
852                                        }
853                                        else if (logger.isErrorEnabled()) {
854                                                logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
855                                        }
856                                }
857                        });
858                        return future;
859                }
860
861                /**
862                 * After a DISCONNECT there should be no more client frames so we can
863                 * close the connection pro-actively. However, if the DISCONNECT has a
864                 * receipt header we leave the connection open and expect the server will
865                 * respond with a RECEIPT and then close the connection.
866                 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT">
867                 *     STOMP Specification 1.2 DISCONNECT</a>
868                 */
869                private void afterDisconnectSent(StompHeaderAccessor accessor) {
870                        if (accessor.getReceipt() == null) {
871                                try {
872                                        clearConnection();
873                                }
874                                catch (Throwable ex) {
875                                        if (logger.isDebugEnabled()) {
876                                                logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex);
877                                        }
878                                }
879                        }
880                }
881
882                /**
883                 * Clean up state associated with the connection and close it.
884                 * Any exception arising from closing the connection are propagated.
885                 */
886                public void clearConnection() {
887                        if (logger.isDebugEnabled()) {
888                                logger.debug("Cleaning up connection state for session " + this.sessionId);
889                        }
890
891                        if (this.isRemoteClientSession) {
892                                StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
893                        }
894
895                        this.isStompConnected = false;
896
897                        TcpConnection<byte[]> conn = this.tcpConnection;
898                        this.tcpConnection = null;
899                        if (conn != null) {
900                                if (logger.isDebugEnabled()) {
901                                        logger.debug("Closing TCP connection in session " + this.sessionId);
902                                }
903                                conn.close();
904                        }
905                }
906
907                @Override
908                public String toString() {
909                        return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
910                }
911        }
912
913
914        private class SystemStompConnectionHandler extends StompConnectionHandler {
915
916                public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
917                        super(SYSTEM_SESSION_ID, connectHeaders, false);
918                }
919
920                @Override
921                protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
922                        if (logger.isInfoEnabled()) {
923                                logger.info("\"System\" session connected.");
924                        }
925                        super.afterStompConnected(connectedHeaders);
926                        publishBrokerAvailableEvent();
927                        sendSystemSubscriptions();
928                }
929
930                private void sendSystemSubscriptions() {
931                        int i = 0;
932                        for (String destination : getSystemSubscriptions().keySet()) {
933                                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
934                                accessor.setSubscriptionId(String.valueOf(i++));
935                                accessor.setDestination(destination);
936                                if (logger.isDebugEnabled()) {
937                                        logger.debug("Subscribing to " + destination + " on \"system\" connection.");
938                                }
939                                TcpConnection<byte[]> conn = getTcpConnection();
940                                if (conn != null) {
941                                        MessageHeaders headers = accessor.getMessageHeaders();
942                                        conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
943                                                        result -> {},
944                                                        ex -> {
945                                                                String error = "Failed to subscribe in \"system\" session.";
946                                                                handleTcpConnectionFailure(error, ex);
947                                                        });
948                                }
949                        }
950                }
951
952                @Override
953                protected void handleInboundMessage(Message<?> message) {
954                        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
955                        if (accessor != null && StompCommand.MESSAGE.equals(accessor.getCommand())) {
956                                String destination = accessor.getDestination();
957                                if (destination == null) {
958                                        if (logger.isDebugEnabled()) {
959                                                logger.debug("Got message on \"system\" connection, with no destination: " +
960                                                                accessor.getDetailedLogMessage(message.getPayload()));
961                                        }
962                                        return;
963                                }
964                                if (!getSystemSubscriptions().containsKey(destination)) {
965                                        if (logger.isDebugEnabled()) {
966                                                logger.debug("Got message on \"system\" connection with no handler: " +
967                                                                accessor.getDetailedLogMessage(message.getPayload()));
968                                        }
969                                        return;
970                                }
971                                try {
972                                        MessageHandler handler = getSystemSubscriptions().get(destination);
973                                        handler.handleMessage(message);
974                                }
975                                catch (Throwable ex) {
976                                        if (logger.isDebugEnabled()) {
977                                                logger.debug("Error while handling message on \"system\" connection.", ex);
978                                        }
979                                }
980                        }
981                }
982
983                @Override
984                protected void handleTcpConnectionFailure(String errorMessage, @Nullable Throwable ex) {
985                        super.handleTcpConnectionFailure(errorMessage, ex);
986                        publishBrokerUnavailableEvent();
987                }
988
989                @Override
990                public void afterConnectionClosed() {
991                        super.afterConnectionClosed();
992                        publishBrokerUnavailableEvent();
993                }
994
995                @Override
996                public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
997                        try {
998                                ListenableFuture<Void> future = super.forward(message, accessor);
999                                if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
1000                                        future.get();
1001                                }
1002                                return future;
1003                        }
1004                        catch (Throwable ex) {
1005                                throw new MessageDeliveryException(message, ex);
1006                        }
1007                }
1008        }
1009
1010
1011        private static class VoidCallable implements Callable<Void> {
1012
1013                @Override
1014                public Void call() {
1015                        return null;
1016                }
1017        }
1018
1019
1020        /**
1021         * Contract for access to session counters.
1022         * @since 5.2
1023         */
1024        public interface Stats {
1025
1026                /**
1027                 * The number of connection handlers.
1028                 */
1029                int getTotalHandlers();
1030
1031                /**
1032                 * The number of CONNECT frames processed.
1033                 */
1034                int getTotalConnect();
1035
1036                /**
1037                 * The number of CONNECTED frames processed.
1038                 */
1039                int getTotalConnected();
1040
1041                /**
1042                 * The number of DISCONNECT frames processed.
1043                 */
1044                int getTotalDisconnect();
1045        }
1046
1047
1048        private class DefaultStats implements Stats {
1049
1050                private final AtomicInteger connect = new AtomicInteger();
1051
1052                private final AtomicInteger connected = new AtomicInteger();
1053
1054                private final AtomicInteger disconnect = new AtomicInteger();
1055
1056                public void incrementConnectCount() {
1057                        this.connect.incrementAndGet();
1058                }
1059
1060                public void incrementConnectedCount() {
1061                        this.connected.incrementAndGet();
1062                }
1063
1064                public void incrementDisconnectCount() {
1065                        this.disconnect.incrementAndGet();
1066                }
1067
1068                @Override
1069                public int getTotalHandlers() {
1070                        return connectionHandlers.size();
1071                }
1072
1073                @Override
1074                public int getTotalConnect() {
1075                        return this.connect.get();
1076                }
1077
1078                @Override
1079                public int getTotalConnected() {
1080                        return this.connected.get();
1081                }
1082
1083                @Override
1084                public int getTotalDisconnect() {
1085                        return this.disconnect.get();
1086                }
1087
1088                @Override
1089                public String toString() {
1090                        return (connectionHandlers.size() + " sessions, " + getTcpClientInfo() +
1091                                        (isBrokerAvailable() ? " (available)" : " (not available)") +
1092                                        ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
1093                                        this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
1094                }
1095        }
1096
1097}