001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.util.Collection;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.concurrent.Callable;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.springframework.messaging.Message;
028import org.springframework.messaging.MessageChannel;
029import org.springframework.messaging.MessageDeliveryException;
030import org.springframework.messaging.MessageHandler;
031import org.springframework.messaging.MessageHeaders;
032import org.springframework.messaging.SubscribableChannel;
033import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
034import org.springframework.messaging.simp.SimpMessageType;
035import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
036import org.springframework.messaging.support.MessageBuilder;
037import org.springframework.messaging.support.MessageHeaderAccessor;
038import org.springframework.messaging.support.MessageHeaderInitializer;
039import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
040import org.springframework.messaging.tcp.TcpConnection;
041import org.springframework.messaging.tcp.TcpConnectionHandler;
042import org.springframework.messaging.tcp.TcpOperations;
043import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
044import org.springframework.util.Assert;
045import org.springframework.util.concurrent.ListenableFuture;
046import org.springframework.util.concurrent.ListenableFutureCallback;
047import org.springframework.util.concurrent.ListenableFutureTask;
048
049/**
050 * A {@link org.springframework.messaging.MessageHandler} that handles messages by
051 * forwarding them to a STOMP broker.
052 *
053 * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
054 * connection to the broker is opened and used exclusively for all messages from the
055 * client that originated the CONNECT message. Messages from the same client are
056 * identified through the session id message header. Reversely, when the STOMP broker
057 * sends messages back on the TCP connection, those messages are enriched with the
058 * session id of the client and sent back downstream through the {@link MessageChannel}
059 * provided to the constructor.
060 *
061 * <p>This class also automatically opens a default "system" TCP connection to the
062 * message broker that is used for sending messages that originate from the server
063 * application (as opposed to from a client). Such messages are not associated with
064 * any client and therefore do not have a session id header. The "system" connection
065 * is effectively shared and cannot be used to receive messages. Several properties
066 * are provided to configure the "system" connection including:
067 * <ul>
068 * <li>{@link #setSystemLogin}</li>
069 * <li>{@link #setSystemPasscode}</li>
070 * <li>{@link #setSystemHeartbeatSendInterval}</li>
071 * <li>{@link #setSystemHeartbeatReceiveInterval}</li>
072 * </ul>
073 *
074 * @author Rossen Stoyanchev
075 * @author Andy Wilkinson
076 * @since 4.0
077 */
078public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
079
080        public static final String SYSTEM_SESSION_ID = "_system_";
081
082        // STOMP recommends error of margin for receiving heartbeats
083        private static final long HEARTBEAT_MULTIPLIER = 3;
084
085        /**
086         * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
087         * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
088         */
089        private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
090
091        private static final byte[] EMPTY_PAYLOAD = new byte[0];
092
093        private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
094
095        private static final Message<byte[]> HEARTBEAT_MESSAGE;
096
097
098        static {
099                EMPTY_TASK.run();
100                StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
101                HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
102        }
103
104
105        private String relayHost = "127.0.0.1";
106
107        private int relayPort = 61613;
108
109        private String clientLogin = "guest";
110
111        private String clientPasscode = "guest";
112
113        private String systemLogin = "guest";
114
115        private String systemPasscode = "guest";
116
117        private long systemHeartbeatSendInterval = 10000;
118
119        private long systemHeartbeatReceiveInterval = 10000;
120
121        private final Map<String, MessageHandler> systemSubscriptions = new HashMap<String, MessageHandler>(4);
122
123        private String virtualHost;
124
125        private TcpOperations<byte[]> tcpClient;
126
127        private MessageHeaderInitializer headerInitializer;
128
129        private final Stats stats = new Stats();
130
131        private final Map<String, StompConnectionHandler> connectionHandlers =
132                        new ConcurrentHashMap<String, StompConnectionHandler>();
133
134
135        /**
136         * Create a StompBrokerRelayMessageHandler instance with the given message channels
137         * and destination prefixes.
138         * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
139         * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
140         * @param brokerChannel the channel for the application to send messages to the broker
141         * @param destinationPrefixes the broker supported destination prefixes; destinations
142         * that do not match the given prefix are ignored.
143         */
144        public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
145                        SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
146
147                super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes);
148        }
149
150
151        /**
152         * Set the STOMP message broker host.
153         */
154        public void setRelayHost(String relayHost) {
155                Assert.hasText(relayHost, "relayHost must not be empty");
156                this.relayHost = relayHost;
157        }
158
159        /**
160         * Return the STOMP message broker host.
161         */
162        public String getRelayHost() {
163                return this.relayHost;
164        }
165
166        /**
167         * Set the STOMP message broker port.
168         */
169        public void setRelayPort(int relayPort) {
170                this.relayPort = relayPort;
171        }
172
173        /**
174         * Return the STOMP message broker port.
175         */
176        public int getRelayPort() {
177                return this.relayPort;
178        }
179        /**
180         * Set the login to use when creating connections to the STOMP broker on
181         * behalf of connected clients.
182         * <p>By default this is set to "guest".
183         * @see #setSystemLogin(String)
184         */
185        public void setClientLogin(String clientLogin) {
186                Assert.hasText(clientLogin, "clientLogin must not be empty");
187                this.clientLogin = clientLogin;
188        }
189
190        /**
191         * Return the configured login to use for connections to the STOMP broker
192         * on behalf of connected clients.
193         * @see #getSystemLogin()
194         */
195        public String getClientLogin() {
196                return this.clientLogin;
197        }
198
199        /**
200         * Set the client passcode to use to create connections to the STOMP broker on
201         * behalf of connected clients.
202         * <p>By default this is set to "guest".
203         * @see #setSystemPasscode
204         */
205        public void setClientPasscode(String clientPasscode) {
206                Assert.hasText(clientPasscode, "clientPasscode must not be empty");
207                this.clientPasscode = clientPasscode;
208        }
209
210        /**
211         * Return the configured passcode to use for connections to the STOMP broker on
212         * behalf of connected clients.
213         * @see #getSystemPasscode()
214         */
215        public String getClientPasscode() {
216                return this.clientPasscode;
217        }
218
219        /**
220         * Set the login for the shared "system" connection used to send messages to
221         * the STOMP broker from within the application, i.e. messages not associated
222         * with a specific client session (e.g. REST/HTTP request handling method).
223         * <p>By default this is set to "guest".
224         */
225        public void setSystemLogin(String systemLogin) {
226                Assert.hasText(systemLogin, "systemLogin must not be empty");
227                this.systemLogin = systemLogin;
228        }
229
230        /**
231         * Return the login used for the shared "system" connection to the STOMP broker.
232         */
233        public String getSystemLogin() {
234                return this.systemLogin;
235        }
236
237        /**
238         * Set the passcode for the shared "system" connection used to send messages to
239         * the STOMP broker from within the application, i.e. messages not associated
240         * with a specific client session (e.g. REST/HTTP request handling method).
241         * <p>By default this is set to "guest".
242         */
243        public void setSystemPasscode(String systemPasscode) {
244                this.systemPasscode = systemPasscode;
245        }
246
247        /**
248         * Return the passcode used for the shared "system" connection to the STOMP broker.
249         */
250        public String getSystemPasscode() {
251                return this.systemPasscode;
252        }
253
254
255        /**
256         * Set the interval, in milliseconds, at which the "system" connection will, in the
257         * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
258         * of zero will prevent heartbeats from being sent to the broker.
259         * <p>The default value is 10000.
260         * <p>See class-level documentation for more information on the "system" connection.
261         */
262        public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
263                this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
264        }
265
266        /**
267         * Return the interval, in milliseconds, at which the "system" connection will
268         * send heartbeats to the STOMP broker.
269         */
270        public long getSystemHeartbeatSendInterval() {
271                return this.systemHeartbeatSendInterval;
272        }
273
274        /**
275         * Set the maximum interval, in milliseconds, at which the "system" connection
276         * expects, in the absence of any other data, to receive a heartbeat from the STOMP
277         * broker. A value of zero will configure the connection to expect not to receive
278         * heartbeats from the broker.
279         * <p>The default value is 10000.
280         * <p>See class-level documentation for more information on the "system" connection.
281         */
282        public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
283                this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
284        }
285
286        /**
287         * Return the interval, in milliseconds, at which the "system" connection expects
288         * to receive heartbeats from the STOMP broker.
289         */
290        public long getSystemHeartbeatReceiveInterval() {
291                return this.systemHeartbeatReceiveInterval;
292        }
293
294        /**
295         * Configure one more destinations to subscribe to on the shared "system"
296         * connection along with MessageHandler's to handle received messages.
297         * <p>This is for internal use in a multi-application server scenario where
298         * servers forward messages to each other (e.g. unresolved user destinations).
299         * @param subscriptions the destinations to subscribe to.
300         */
301        public void setSystemSubscriptions(Map<String, MessageHandler> subscriptions) {
302                this.systemSubscriptions.clear();
303                if (subscriptions != null) {
304                        this.systemSubscriptions.putAll(subscriptions);
305                }
306        }
307
308        /**
309         * Return the configured map with subscriptions on the "system" connection.
310         */
311        public Map<String, MessageHandler> getSystemSubscriptions() {
312                return this.systemSubscriptions;
313        }
314
315        /**
316         * Set the value of the "host" header to use in STOMP CONNECT frames. When this
317         * property is configured, a "host" header will be added to every STOMP frame sent to
318         * the STOMP broker. This may be useful for example in a cloud environment where the
319         * actual host to which the TCP connection is established is different from the host
320         * providing the cloud-based STOMP service.
321         * <p>By default this property is not set.
322         */
323        public void setVirtualHost(String virtualHost) {
324                this.virtualHost = virtualHost;
325        }
326
327        /**
328         * Return the configured virtual host value.
329         */
330        public String getVirtualHost() {
331                return this.virtualHost;
332        }
333
334        /**
335         * Configure a TCP client for managing TCP connections to the STOMP broker.
336         * By default {@link Reactor2TcpClient} is used.
337         * <p><strong>Note:</strong> when this property is used, any
338         * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
339         * specified are effectively ignored.
340         */
341        public void setTcpClient(TcpOperations<byte[]> tcpClient) {
342                this.tcpClient = tcpClient;
343        }
344
345        /**
346         * Get the configured TCP client (never {@code null} unless not configured
347         * invoked and this method is invoked before the handler is started and
348         * hence a default implementation initialized).
349         */
350        public TcpOperations<byte[]> getTcpClient() {
351                return this.tcpClient;
352        }
353
354        /**
355         * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
356         * messages created through the {@code StompBrokerRelayMessageHandler} that
357         * are sent to the client outbound message channel.
358         * <p>By default this property is not set.
359         */
360        public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
361                this.headerInitializer = headerInitializer;
362        }
363
364        /**
365         * Return the configured header initializer.
366         */
367        public MessageHeaderInitializer getHeaderInitializer() {
368                return this.headerInitializer;
369        }
370
371        /**
372         * Return a String describing internal state and counters.
373         */
374        public String getStatsInfo() {
375                return this.stats.toString();
376        }
377
378        /**
379         * Return the current count of TCP connection to the broker.
380         */
381        public int getConnectionCount() {
382                return this.connectionHandlers.size();
383        }
384
385
386        @Override
387        protected void startInternal() {
388                if (this.tcpClient == null) {
389                        StompDecoder decoder = new StompDecoder();
390                        decoder.setHeaderInitializer(getHeaderInitializer());
391                        Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
392                        this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
393                }
394
395                if (logger.isInfoEnabled()) {
396                        logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
397                }
398
399                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
400                accessor.setAcceptVersion("1.1,1.2");
401                accessor.setLogin(this.systemLogin);
402                accessor.setPasscode(this.systemPasscode);
403                accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
404                accessor.setHost(getVirtualHost());
405                accessor.setSessionId(SYSTEM_SESSION_ID);
406                if (logger.isDebugEnabled()) {
407                        logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
408                }
409
410                SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
411                this.connectionHandlers.put(handler.getSessionId(), handler);
412
413                this.stats.incrementConnectCount();
414                this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
415        }
416
417        @Override
418        protected void stopInternal() {
419                publishBrokerUnavailableEvent();
420                try {
421                        this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
422                }
423                catch (Throwable ex) {
424                        logger.error("Error in shutdown of TCP client", ex);
425                }
426        }
427
428        @Override
429        protected void handleMessageInternal(Message<?> message) {
430                String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
431
432                if (!isBrokerAvailable()) {
433                        if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
434                                throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
435                                                "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
436                        }
437                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
438                        if (handler != null) {
439                                handler.sendStompErrorFrameToClient("Broker not available.");
440                                handler.clearConnection();
441                        }
442                        else {
443                                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
444                                if (getHeaderInitializer() != null) {
445                                        getHeaderInitializer().initHeaders(accessor);
446                                }
447                                accessor.setSessionId(sessionId);
448                                accessor.setUser(SimpMessageHeaderAccessor.getUser(message.getHeaders()));
449                                accessor.setMessage("Broker not available.");
450                                MessageHeaders headers = accessor.getMessageHeaders();
451                                getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
452                        }
453                        return;
454                }
455
456                StompHeaderAccessor stompAccessor;
457                StompCommand command;
458
459                MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
460                if (accessor == null) {
461                        throw new IllegalStateException(
462                                        "No header accessor (not using the SimpMessagingTemplate?): " + message);
463                }
464                else if (accessor instanceof StompHeaderAccessor) {
465                        stompAccessor = (StompHeaderAccessor) accessor;
466                        command = stompAccessor.getCommand();
467                }
468                else if (accessor instanceof SimpMessageHeaderAccessor) {
469                        stompAccessor = StompHeaderAccessor.wrap(message);
470                        command = stompAccessor.getCommand();
471                        if (command == null) {
472                                command = stompAccessor.updateStompCommandAsClientMessage();
473                        }
474                }
475                else {
476                        throw new IllegalStateException(
477                                        "Unexpected header accessor type " + accessor.getClass() + " in " + message);
478                }
479
480                if (sessionId == null) {
481                        if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
482                                if (logger.isErrorEnabled()) {
483                                        logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
484                                }
485                                return;
486                        }
487                        sessionId = SYSTEM_SESSION_ID;
488                        stompAccessor.setSessionId(sessionId);
489                }
490
491                String destination = stompAccessor.getDestination();
492                if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
493                        return;
494                }
495
496                if (StompCommand.CONNECT.equals(command)) {
497                        if (logger.isDebugEnabled()) {
498                                logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
499                        }
500                        stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
501                        stompAccessor.setLogin(this.clientLogin);
502                        stompAccessor.setPasscode(this.clientPasscode);
503                        if (getVirtualHost() != null) {
504                                stompAccessor.setHost(getVirtualHost());
505                        }
506                        StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
507                        this.connectionHandlers.put(sessionId, handler);
508                        this.stats.incrementConnectCount();
509                        this.tcpClient.connect(handler);
510                }
511                else if (StompCommand.DISCONNECT.equals(command)) {
512                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
513                        if (handler == null) {
514                                if (logger.isDebugEnabled()) {
515                                        logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
516                                }
517                                return;
518                        }
519                        stats.incrementDisconnectCount();
520                        handler.forward(message, stompAccessor);
521                }
522                else {
523                        StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
524                        if (handler == null) {
525                                if (logger.isDebugEnabled()) {
526                                        logger.debug("No TCP connection for session " + sessionId + " in " + message);
527                                }
528                                return;
529                        }
530                        handler.forward(message, stompAccessor);
531                }
532        }
533
534        @Override
535        public String toString() {
536                return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
537        }
538
539
540        private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
541
542                private final String sessionId;
543
544                private final boolean isRemoteClientSession;
545
546                private final StompHeaderAccessor connectHeaders;
547
548                private volatile TcpConnection<byte[]> tcpConnection;
549
550                private volatile boolean isStompConnected;
551
552
553                private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
554                        this(sessionId, connectHeaders, true);
555                }
556
557                private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
558                        Assert.notNull(sessionId, "'sessionId' must not be null");
559                        Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
560                        this.sessionId = sessionId;
561                        this.connectHeaders = connectHeaders;
562                        this.isRemoteClientSession = isClientSession;
563                }
564
565                public String getSessionId() {
566                        return this.sessionId;
567                }
568
569                protected TcpConnection<byte[]> getTcpConnection() {
570                        return this.tcpConnection;
571                }
572
573                @Override
574                public void afterConnected(TcpConnection<byte[]> connection) {
575                        if (logger.isDebugEnabled()) {
576                                logger.debug("TCP connection opened in session=" + getSessionId());
577                        }
578                        this.tcpConnection = connection;
579                        this.tcpConnection.onReadInactivity(new Runnable() {
580                                @Override
581                                public void run() {
582                                        if (tcpConnection != null && !isStompConnected) {
583                                                handleTcpConnectionFailure("No CONNECTED frame received in " +
584                                                                MAX_TIME_TO_CONNECTED_FRAME + " ms.", null);
585                                        }
586                                }
587                        }, MAX_TIME_TO_CONNECTED_FRAME);
588                        connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
589                }
590
591                @Override
592                public void afterConnectFailure(Throwable ex) {
593                        handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex);
594                }
595
596                /**
597                 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish
598                 * the TCP connection, failure to send a message, missed heartbeat, etc.
599                 */
600                protected void handleTcpConnectionFailure(String error, Throwable ex) {
601                        if (logger.isErrorEnabled()) {
602                                logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
603                        }
604                        try {
605                                sendStompErrorFrameToClient(error);
606                        }
607                        finally {
608                                try {
609                                        clearConnection();
610                                }
611                                catch (Throwable ex2) {
612                                        if (logger.isDebugEnabled()) {
613                                                logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2);
614                                        }
615                                }
616                        }
617                }
618
619                private void sendStompErrorFrameToClient(String errorText) {
620                        if (this.isRemoteClientSession) {
621                                StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
622                                if (getHeaderInitializer() != null) {
623                                        getHeaderInitializer().initHeaders(headerAccessor);
624                                }
625                                headerAccessor.setSessionId(this.sessionId);
626                                headerAccessor.setUser(this.connectHeaders.getUser());
627                                headerAccessor.setMessage(errorText);
628                                Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
629                                handleInboundMessage(errorMessage);
630                        }
631                }
632
633                protected void handleInboundMessage(Message<?> message) {
634                        if (this.isRemoteClientSession) {
635                                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
636                                accessor.setImmutable();
637                                StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
638                        }
639                }
640
641                @Override
642                public void handleMessage(Message<byte[]> message) {
643                        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
644                        accessor.setSessionId(this.sessionId);
645                        accessor.setUser(this.connectHeaders.getUser());
646
647                        StompCommand command = accessor.getCommand();
648                        if (StompCommand.CONNECTED.equals(command)) {
649                                if (logger.isDebugEnabled()) {
650                                        logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
651                                }
652                                afterStompConnected(accessor);
653                        }
654                        else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
655                                logger.error("Received " + accessor.getShortLogMessage(message.getPayload()));
656                        }
657                        else if (logger.isTraceEnabled()) {
658                                logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
659                        }
660
661                        handleInboundMessage(message);
662                }
663
664                /**
665                 * Invoked after the STOMP CONNECTED frame is received. At this point the
666                 * connection is ready for sending STOMP messages to the broker.
667                 */
668                protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
669                        this.isStompConnected = true;
670                        stats.incrementConnectedCount();
671                        initHeartbeats(connectedHeaders);
672                }
673
674                private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
675                        if (this.isRemoteClientSession) {
676                                return;
677                        }
678
679                        long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
680                        long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
681                        long serverSendInterval = connectedHeaders.getHeartbeat()[0];
682                        long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
683
684                        if (clientSendInterval > 0 && serverReceiveInterval > 0) {
685                                long interval = Math.max(clientSendInterval,  serverReceiveInterval);
686                                this.tcpConnection.onWriteInactivity(new Runnable() {
687                                        @Override
688                                        public void run() {
689                                                TcpConnection<byte[]> conn = tcpConnection;
690                                                if (conn != null) {
691                                                        conn.send(HEARTBEAT_MESSAGE).addCallback(
692                                                                        new ListenableFutureCallback<Void>() {
693                                                                                public void onSuccess(Void result) {
694                                                                                }
695                                                                                public void onFailure(Throwable ex) {
696                                                                                        handleTcpConnectionFailure(
697                                                                                                        "Failed to forward heartbeat: " + ex.getMessage(), ex);
698                                                                                }
699                                                                        });
700                                                }
701                                        }
702                                }, interval);
703                        }
704                        if (clientReceiveInterval > 0 && serverSendInterval > 0) {
705                                final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
706                                this.tcpConnection.onReadInactivity(new Runnable() {
707                                        @Override
708                                        public void run() {
709                                                handleTcpConnectionFailure("No messages received in " + interval + " ms.", null);
710                                        }
711                                }, interval);
712                        }
713                }
714
715                @Override
716                public void handleFailure(Throwable ex) {
717                        if (this.tcpConnection != null) {
718                                handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex);
719                        }
720                        else if (logger.isErrorEnabled()) {
721                                logger.error("Transport failure: " + ex);
722                        }
723                }
724
725                @Override
726                public void afterConnectionClosed() {
727                        if (this.tcpConnection == null) {
728                                return;
729                        }
730                        try {
731                                if (logger.isDebugEnabled()) {
732                                        logger.debug("TCP connection to broker closed in session " + this.sessionId);
733                                }
734                                sendStompErrorFrameToClient("Connection to broker closed.");
735                        }
736                        finally {
737                                try {
738                                        // Prevent clearConnection() from trying to close
739                                        this.tcpConnection = null;
740                                        clearConnection();
741                                }
742                                catch (Throwable ex) {
743                                        // Shouldn't happen with connection reset beforehand
744                                }
745                        }
746                }
747
748                /**
749                 * Forward the given message to the STOMP broker.
750                 * <p>The method checks whether we have an active TCP connection and have
751                 * received the STOMP CONNECTED frame. For client messages this should be
752                 * false only if we lose the TCP connection around the same time when a
753                 * client message is being forwarded, so we simply log the ignored message
754                 * at debug level. For messages from within the application being sent on
755                 * the "system" connection an exception is raised so that components sending
756                 * the message have a chance to handle it -- by default the broker message
757                 * channel is synchronous.
758                 * <p>Note that if messages arrive concurrently around the same time a TCP
759                 * connection is lost, there is a brief period of time before the connection
760                 * is reset when one or more messages may sneak through and an attempt made
761                 * to forward them. Rather than synchronizing to guard against that, this
762                 * method simply lets them try and fail. For client sessions that may
763                 * result in an additional STOMP ERROR frame(s) being sent downstream but
764                 * code handling that downstream should be idempotent in such cases.
765                 * @param message the message to send (never {@code null})
766                 * @return a future to wait for the result
767                 */
768                @SuppressWarnings("unchecked")
769                public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
770                        TcpConnection<byte[]> conn = this.tcpConnection;
771
772                        if (!this.isStompConnected || conn == null) {
773                                if (this.isRemoteClientSession) {
774                                        if (logger.isDebugEnabled()) {
775                                                logger.debug("TCP connection closed already, ignoring " +
776                                                                accessor.getShortLogMessage(message.getPayload()));
777                                        }
778                                        return EMPTY_TASK;
779                                }
780                                else {
781                                        throw new IllegalStateException("Cannot forward messages " +
782                                                        (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
783                                                        "Consider subscribing to receive BrokerAvailabilityEvent's from " +
784                                                        "an ApplicationListener Spring bean. Dropped " +
785                                                        accessor.getShortLogMessage(message.getPayload()));
786                                }
787                        }
788
789                        final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
790                                        MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
791
792                        StompCommand command = accessor.getCommand();
793                        if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
794                                        StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
795                                logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
796                        }
797                        else if (logger.isTraceEnabled()) {
798                                logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
799                        }
800
801                        ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
802                        future.addCallback(new ListenableFutureCallback<Void>() {
803                                @Override
804                                public void onSuccess(Void result) {
805                                        if (accessor.getCommand() == StompCommand.DISCONNECT) {
806                                                afterDisconnectSent(accessor);
807                                        }
808                                }
809                                @Override
810                                public void onFailure(Throwable ex) {
811                                        if (tcpConnection != null) {
812                                                handleTcpConnectionFailure("failed to forward " +
813                                                                accessor.getShortLogMessage(message.getPayload()), ex);
814                                        }
815                                        else if (logger.isErrorEnabled()) {
816                                                logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
817                                        }
818                                }
819                        });
820                        return future;
821                }
822
823                /**
824                 * After a DISCONNECT there should be no more client frames so we can
825                 * close the connection pro-actively. However, if the DISCONNECT has a
826                 * receipt header we leave the connection open and expect the server will
827                 * respond with a RECEIPT and then close the connection.
828                 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT">
829                 *     STOMP Specification 1.2 DISCONNECT</a>
830                 */
831                private void afterDisconnectSent(StompHeaderAccessor accessor) {
832                        if (accessor.getReceipt() == null) {
833                                try {
834                                        clearConnection();
835                                }
836                                catch (Throwable ex) {
837                                        if (logger.isDebugEnabled()) {
838                                                logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex);
839                                        }
840                                }
841                        }
842                }
843
844                /**
845                 * Clean up state associated with the connection and close it.
846                 * Any exception arising from closing the connection are propagated.
847                 */
848                public void clearConnection() {
849                        if (logger.isDebugEnabled()) {
850                                logger.debug("Cleaning up connection state for session " + this.sessionId);
851                        }
852
853                        if (this.isRemoteClientSession) {
854                                StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
855                        }
856
857                        this.isStompConnected = false;
858
859                        TcpConnection<byte[]> conn = this.tcpConnection;
860                        this.tcpConnection = null;
861                        if (conn != null) {
862                                if (logger.isDebugEnabled()) {
863                                        logger.debug("Closing TCP connection in session " + this.sessionId);
864                                }
865                                conn.close();
866                        }
867                }
868
869                @Override
870                public String toString() {
871                        return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
872                }
873        }
874
875
876        private class SystemStompConnectionHandler extends StompConnectionHandler {
877
878                public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
879                        super(SYSTEM_SESSION_ID, connectHeaders, false);
880                }
881
882                @Override
883                protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
884                        if (logger.isInfoEnabled()) {
885                                logger.info("\"System\" session connected.");
886                        }
887                        super.afterStompConnected(connectedHeaders);
888                        publishBrokerAvailableEvent();
889                        sendSystemSubscriptions();
890                }
891
892                private void sendSystemSubscriptions() {
893                        int i = 0;
894                        for (String destination : getSystemSubscriptions().keySet()) {
895                                StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
896                                accessor.setSubscriptionId(String.valueOf(i++));
897                                accessor.setDestination(destination);
898                                if (logger.isDebugEnabled()) {
899                                        logger.debug("Subscribing to " + destination + " on \"system\" connection.");
900                                }
901                                TcpConnection<byte[]> conn = getTcpConnection();
902                                if (conn != null) {
903                                        MessageHeaders headers = accessor.getMessageHeaders();
904                                        conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
905                                                        new ListenableFutureCallback<Void>() {
906                                                                public void onSuccess(Void result) {
907                                                                }
908                                                                public void onFailure(Throwable ex) {
909                                                                        String error = "Failed to subscribe in \"system\" session.";
910                                                                        handleTcpConnectionFailure(error, ex);
911                                                                }
912                                                        });
913                                }
914                        }
915                }
916
917                @Override
918                protected void handleInboundMessage(Message<?> message) {
919                        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
920                        if (StompCommand.MESSAGE.equals(accessor.getCommand())) {
921                                String destination = accessor.getDestination();
922                                if (destination == null) {
923                                        if (logger.isDebugEnabled()) {
924                                                logger.debug("Got message on \"system\" connection, with no destination: " +
925                                                                accessor.getDetailedLogMessage(message.getPayload()));
926                                        }
927                                        return;
928                                }
929                                if (!getSystemSubscriptions().containsKey(destination)) {
930                                        if (logger.isDebugEnabled()) {
931                                                logger.debug("Got message on \"system\" connection with no handler: " +
932                                                                accessor.getDetailedLogMessage(message.getPayload()));
933                                        }
934                                        return;
935                                }
936                                try {
937                                        MessageHandler handler = getSystemSubscriptions().get(destination);
938                                        handler.handleMessage(message);
939                                }
940                                catch (Throwable ex) {
941                                        if (logger.isDebugEnabled()) {
942                                                logger.debug("Error while handling message on \"system\" connection.", ex);
943                                        }
944                                }
945                        }
946                }
947
948                @Override
949                protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
950                        super.handleTcpConnectionFailure(errorMessage, ex);
951                        publishBrokerUnavailableEvent();
952                }
953
954                @Override
955                public void afterConnectionClosed() {
956                        super.afterConnectionClosed();
957                        publishBrokerUnavailableEvent();
958                }
959
960                @Override
961                public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
962                        try {
963                                ListenableFuture<Void> future = super.forward(message, accessor);
964                                if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
965                                        future.get();
966                                }
967                                return future;
968                        }
969                        catch (Throwable ex) {
970                                throw new MessageDeliveryException(message, ex);
971                        }
972                }
973        }
974
975
976        private static class StompTcpClientFactory {
977
978                public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor2StompCodec codec) {
979                        return new Reactor2TcpClient<byte[]>(relayHost, relayPort, codec);
980                }
981        }
982
983
984        private static class VoidCallable implements Callable<Void> {
985
986                @Override
987                public Void call() throws Exception {
988                        return null;
989                }
990        }
991
992
993        private class Stats {
994
995                private final AtomicInteger connect = new AtomicInteger();
996
997                private final AtomicInteger connected = new AtomicInteger();
998
999                private final AtomicInteger disconnect = new AtomicInteger();
1000
1001                public void incrementConnectCount() {
1002                        this.connect.incrementAndGet();
1003                }
1004
1005                public void incrementConnectedCount() {
1006                        this.connected.incrementAndGet();
1007                }
1008
1009                public void incrementDisconnectCount() {
1010                        this.disconnect.incrementAndGet();
1011                }
1012
1013                public String toString() {
1014                        return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
1015                                        (isBrokerAvailable() ? " (available)" : " (not available)") +
1016                                        ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
1017                                        this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
1018                }
1019        }
1020
1021}