001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.security.Principal; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.springframework.lang.Nullable; 029import org.springframework.messaging.Message; 030import org.springframework.messaging.MessageChannel; 031import org.springframework.messaging.MessageDeliveryException; 032import org.springframework.messaging.MessageHandler; 033import org.springframework.messaging.MessageHeaders; 034import org.springframework.messaging.SubscribableChannel; 035import org.springframework.messaging.simp.SimpLogging; 036import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 037import org.springframework.messaging.simp.SimpMessageType; 038import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler; 039import org.springframework.messaging.support.MessageBuilder; 040import org.springframework.messaging.support.MessageHeaderAccessor; 041import org.springframework.messaging.support.MessageHeaderInitializer; 042import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; 043import org.springframework.messaging.tcp.TcpConnection; 044import org.springframework.messaging.tcp.TcpConnectionHandler; 045import org.springframework.messaging.tcp.TcpOperations; 046import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; 047import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; 048import org.springframework.util.Assert; 049import org.springframework.util.concurrent.ListenableFuture; 050import org.springframework.util.concurrent.ListenableFutureCallback; 051import org.springframework.util.concurrent.ListenableFutureTask; 052 053/** 054 * A {@link org.springframework.messaging.MessageHandler} that handles messages by 055 * forwarding them to a STOMP broker. 056 * 057 * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP 058 * connection to the broker is opened and used exclusively for all messages from the 059 * client that originated the CONNECT message. Messages from the same client are 060 * identified through the session id message header. Reversely, when the STOMP broker 061 * sends messages back on the TCP connection, those messages are enriched with the 062 * session id of the client and sent back downstream through the {@link MessageChannel} 063 * provided to the constructor. 064 * 065 * <p>This class also automatically opens a default "system" TCP connection to the 066 * message broker that is used for sending messages that originate from the server 067 * application (as opposed to from a client). Such messages are not associated with 068 * any client and therefore do not have a session id header. The "system" connection 069 * is effectively shared and cannot be used to receive messages. Several properties 070 * are provided to configure the "system" connection including: 071 * <ul> 072 * <li>{@link #setSystemLogin}</li> 073 * <li>{@link #setSystemPasscode}</li> 074 * <li>{@link #setSystemHeartbeatSendInterval}</li> 075 * <li>{@link #setSystemHeartbeatReceiveInterval}</li> 076 * </ul> 077 * 078 * @author Rossen Stoyanchev 079 * @author Andy Wilkinson 080 * @since 4.0 081 */ 082public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { 083 084 /** 085 * The system session ID. 086 */ 087 public static final String SYSTEM_SESSION_ID = "_system_"; 088 089 /** STOMP recommended error of margin for receiving heartbeats. */ 090 private static final long HEARTBEAT_MULTIPLIER = 3; 091 092 /** 093 * Heartbeat starts once CONNECTED frame with heartbeat settings is received. 094 * If CONNECTED doesn't arrive within a minute, we'll close the connection. 095 */ 096 private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; 097 098 private static final byte[] EMPTY_PAYLOAD = new byte[0]; 099 100 private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); 101 102 private static final Message<byte[]> HEARTBEAT_MESSAGE; 103 104 105 static { 106 EMPTY_TASK.run(); 107 StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat(); 108 HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders()); 109 } 110 111 112 private String relayHost = "127.0.0.1"; 113 114 private int relayPort = 61613; 115 116 private String clientLogin = "guest"; 117 118 private String clientPasscode = "guest"; 119 120 private String systemLogin = "guest"; 121 122 private String systemPasscode = "guest"; 123 124 private long systemHeartbeatSendInterval = 10000; 125 126 private long systemHeartbeatReceiveInterval = 10000; 127 128 private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4); 129 130 @Nullable 131 private String virtualHost; 132 133 @Nullable 134 private TcpOperations<byte[]> tcpClient; 135 136 @Nullable 137 private MessageHeaderInitializer headerInitializer; 138 139 private final DefaultStats stats = new DefaultStats(); 140 141 private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>(); 142 143 144 /** 145 * Create a StompBrokerRelayMessageHandler instance with the given message channels 146 * and destination prefixes. 147 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 148 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 149 * @param brokerChannel the channel for the application to send messages to the broker 150 * @param destinationPrefixes the broker supported destination prefixes; destinations 151 * that do not match the given prefix are ignored. 152 */ 153 public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 154 SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) { 155 156 super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes); 157 } 158 159 160 /** 161 * Set the STOMP message broker host. 162 */ 163 public void setRelayHost(String relayHost) { 164 Assert.hasText(relayHost, "relayHost must not be empty"); 165 this.relayHost = relayHost; 166 } 167 168 /** 169 * Return the STOMP message broker host. 170 */ 171 public String getRelayHost() { 172 return this.relayHost; 173 } 174 175 /** 176 * Set the STOMP message broker port. 177 */ 178 public void setRelayPort(int relayPort) { 179 this.relayPort = relayPort; 180 } 181 182 /** 183 * Return the STOMP message broker port. 184 */ 185 public int getRelayPort() { 186 return this.relayPort; 187 } 188 /** 189 * Set the login to use when creating connections to the STOMP broker on 190 * behalf of connected clients. 191 * <p>By default this is set to "guest". 192 * @see #setSystemLogin(String) 193 */ 194 public void setClientLogin(String clientLogin) { 195 Assert.hasText(clientLogin, "clientLogin must not be empty"); 196 this.clientLogin = clientLogin; 197 } 198 199 /** 200 * Return the configured login to use for connections to the STOMP broker 201 * on behalf of connected clients. 202 * @see #getSystemLogin() 203 */ 204 public String getClientLogin() { 205 return this.clientLogin; 206 } 207 208 /** 209 * Set the client passcode to use to create connections to the STOMP broker on 210 * behalf of connected clients. 211 * <p>By default this is set to "guest". 212 * @see #setSystemPasscode 213 */ 214 public void setClientPasscode(String clientPasscode) { 215 Assert.hasText(clientPasscode, "clientPasscode must not be empty"); 216 this.clientPasscode = clientPasscode; 217 } 218 219 /** 220 * Return the configured passcode to use for connections to the STOMP broker on 221 * behalf of connected clients. 222 * @see #getSystemPasscode() 223 */ 224 public String getClientPasscode() { 225 return this.clientPasscode; 226 } 227 228 /** 229 * Set the login for the shared "system" connection used to send messages to 230 * the STOMP broker from within the application, i.e. messages not associated 231 * with a specific client session (e.g. REST/HTTP request handling method). 232 * <p>By default this is set to "guest". 233 */ 234 public void setSystemLogin(String systemLogin) { 235 Assert.hasText(systemLogin, "systemLogin must not be empty"); 236 this.systemLogin = systemLogin; 237 } 238 239 /** 240 * Return the login used for the shared "system" connection to the STOMP broker. 241 */ 242 public String getSystemLogin() { 243 return this.systemLogin; 244 } 245 246 /** 247 * Set the passcode for the shared "system" connection used to send messages to 248 * the STOMP broker from within the application, i.e. messages not associated 249 * with a specific client session (e.g. REST/HTTP request handling method). 250 * <p>By default this is set to "guest". 251 */ 252 public void setSystemPasscode(String systemPasscode) { 253 this.systemPasscode = systemPasscode; 254 } 255 256 /** 257 * Return the passcode used for the shared "system" connection to the STOMP broker. 258 */ 259 public String getSystemPasscode() { 260 return this.systemPasscode; 261 } 262 263 264 /** 265 * Set the interval, in milliseconds, at which the "system" connection will, in the 266 * absence of any other data being sent, send a heartbeat to the STOMP broker. A value 267 * of zero will prevent heartbeats from being sent to the broker. 268 * <p>The default value is 10000. 269 * <p>See class-level documentation for more information on the "system" connection. 270 */ 271 public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { 272 this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; 273 } 274 275 /** 276 * Return the interval, in milliseconds, at which the "system" connection will 277 * send heartbeats to the STOMP broker. 278 */ 279 public long getSystemHeartbeatSendInterval() { 280 return this.systemHeartbeatSendInterval; 281 } 282 283 /** 284 * Set the maximum interval, in milliseconds, at which the "system" connection 285 * expects, in the absence of any other data, to receive a heartbeat from the STOMP 286 * broker. A value of zero will configure the connection to expect not to receive 287 * heartbeats from the broker. 288 * <p>The default value is 10000. 289 * <p>See class-level documentation for more information on the "system" connection. 290 */ 291 public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { 292 this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; 293 } 294 295 /** 296 * Return the interval, in milliseconds, at which the "system" connection expects 297 * to receive heartbeats from the STOMP broker. 298 */ 299 public long getSystemHeartbeatReceiveInterval() { 300 return this.systemHeartbeatReceiveInterval; 301 } 302 303 /** 304 * Configure one more destinations to subscribe to on the shared "system" 305 * connection along with MessageHandler's to handle received messages. 306 * <p>This is for internal use in a multi-application server scenario where 307 * servers forward messages to each other (e.g. unresolved user destinations). 308 * @param subscriptions the destinations to subscribe to. 309 */ 310 public void setSystemSubscriptions(@Nullable Map<String, MessageHandler> subscriptions) { 311 this.systemSubscriptions.clear(); 312 if (subscriptions != null) { 313 this.systemSubscriptions.putAll(subscriptions); 314 } 315 } 316 317 /** 318 * Return the configured map with subscriptions on the "system" connection. 319 */ 320 public Map<String, MessageHandler> getSystemSubscriptions() { 321 return this.systemSubscriptions; 322 } 323 324 /** 325 * Set the value of the "host" header to use in STOMP CONNECT frames. When this 326 * property is configured, a "host" header will be added to every STOMP frame sent to 327 * the STOMP broker. This may be useful for example in a cloud environment where the 328 * actual host to which the TCP connection is established is different from the host 329 * providing the cloud-based STOMP service. 330 * <p>By default this property is not set. 331 */ 332 public void setVirtualHost(@Nullable String virtualHost) { 333 this.virtualHost = virtualHost; 334 } 335 336 /** 337 * Return the configured virtual host value. 338 */ 339 @Nullable 340 public String getVirtualHost() { 341 return this.virtualHost; 342 } 343 344 /** 345 * Configure a TCP client for managing TCP connections to the STOMP broker. 346 * <p>By default {@link ReactorNettyTcpClient} is used. 347 * <p><strong>Note:</strong> when this property is used, any 348 * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} 349 * specified are effectively ignored. 350 */ 351 public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient) { 352 this.tcpClient = tcpClient; 353 } 354 355 /** 356 * Get the configured TCP client (never {@code null} unless not configured 357 * invoked and this method is invoked before the handler is started and 358 * hence a default implementation initialized). 359 */ 360 @Nullable 361 public TcpOperations<byte[]> getTcpClient() { 362 return this.tcpClient; 363 } 364 365 /** 366 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 367 * messages created through the {@code StompBrokerRelayMessageHandler} that 368 * are sent to the client outbound message channel. 369 * <p>By default this property is not set. 370 */ 371 public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) { 372 this.headerInitializer = headerInitializer; 373 } 374 375 /** 376 * Return the configured header initializer. 377 */ 378 @Nullable 379 public MessageHeaderInitializer getHeaderInitializer() { 380 return this.headerInitializer; 381 } 382 383 /** 384 * Return a String describing internal state and counters. 385 * Effectively {@code toString()} on {@link #getStats() getStats()}. 386 */ 387 public String getStatsInfo() { 388 return this.stats.toString(); 389 } 390 391 /** 392 * Return a structured object with internal state and counters. 393 * @since 5.2 394 */ 395 public Stats getStats() { 396 return this.stats; 397 } 398 399 400 /** 401 * Return the current count of TCP connection to the broker. 402 */ 403 public int getConnectionCount() { 404 return this.connectionHandlers.size(); 405 } 406 407 408 @Override 409 protected void startInternal() { 410 if (this.tcpClient == null) { 411 this.tcpClient = initTcpClient(); 412 } 413 414 if (logger.isInfoEnabled()) { 415 logger.info("Starting \"system\" session, " + toString()); 416 } 417 418 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT); 419 accessor.setAcceptVersion("1.1,1.2"); 420 accessor.setLogin(this.systemLogin); 421 accessor.setPasscode(this.systemPasscode); 422 accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval); 423 String virtualHost = getVirtualHost(); 424 if (virtualHost != null) { 425 accessor.setHost(virtualHost); 426 } 427 accessor.setSessionId(SYSTEM_SESSION_ID); 428 if (logger.isDebugEnabled()) { 429 logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD)); 430 } 431 432 SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor); 433 this.connectionHandlers.put(handler.getSessionId(), handler); 434 435 this.stats.incrementConnectCount(); 436 this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); 437 } 438 439 private ReactorNettyTcpClient<byte[]> initTcpClient() { 440 StompDecoder decoder = new StompDecoder(); 441 if (this.headerInitializer != null) { 442 decoder.setHeaderInitializer(this.headerInitializer); 443 } 444 ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder); 445 ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec); 446 client.setLogger(SimpLogging.forLog(client.getLogger())); 447 return client; 448 } 449 450 @Override 451 protected void stopInternal() { 452 publishBrokerUnavailableEvent(); 453 if (this.tcpClient != null) { 454 try { 455 this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); 456 } 457 catch (Throwable ex) { 458 logger.error("Error in shutdown of TCP client", ex); 459 } 460 } 461 } 462 463 @Override 464 protected void handleMessageInternal(Message<?> message) { 465 String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders()); 466 467 if (!isBrokerAvailable()) { 468 if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) { 469 throw new MessageDeliveryException("Message broker not active. Consider subscribing to " + 470 "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); 471 } 472 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 473 if (handler != null) { 474 handler.sendStompErrorFrameToClient("Broker not available."); 475 handler.clearConnection(); 476 } 477 else { 478 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); 479 if (getHeaderInitializer() != null) { 480 getHeaderInitializer().initHeaders(accessor); 481 } 482 accessor.setSessionId(sessionId); 483 Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders()); 484 if (user != null) { 485 accessor.setUser(user); 486 } 487 accessor.setMessage("Broker not available."); 488 MessageHeaders headers = accessor.getMessageHeaders(); 489 getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)); 490 } 491 return; 492 } 493 494 StompHeaderAccessor stompAccessor; 495 StompCommand command; 496 497 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 498 if (accessor == null) { 499 throw new IllegalStateException( 500 "No header accessor (not using the SimpMessagingTemplate?): " + message); 501 } 502 else if (accessor instanceof StompHeaderAccessor) { 503 stompAccessor = (StompHeaderAccessor) accessor; 504 command = stompAccessor.getCommand(); 505 } 506 else if (accessor instanceof SimpMessageHeaderAccessor) { 507 stompAccessor = StompHeaderAccessor.wrap(message); 508 command = stompAccessor.getCommand(); 509 if (command == null) { 510 command = stompAccessor.updateStompCommandAsClientMessage(); 511 } 512 } 513 else { 514 throw new IllegalStateException( 515 "Unexpected header accessor type " + accessor.getClass() + " in " + message); 516 } 517 518 if (sessionId == null) { 519 if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) { 520 if (logger.isErrorEnabled()) { 521 logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message); 522 } 523 return; 524 } 525 sessionId = SYSTEM_SESSION_ID; 526 stompAccessor.setSessionId(sessionId); 527 } 528 529 String destination = stompAccessor.getDestination(); 530 if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) { 531 return; 532 } 533 534 if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) { 535 if (logger.isDebugEnabled()) { 536 logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD)); 537 } 538 stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message)); 539 stompAccessor.setLogin(this.clientLogin); 540 stompAccessor.setPasscode(this.clientPasscode); 541 if (getVirtualHost() != null) { 542 stompAccessor.setHost(getVirtualHost()); 543 } 544 StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); 545 this.connectionHandlers.put(sessionId, handler); 546 this.stats.incrementConnectCount(); 547 Assert.state(this.tcpClient != null, "No TCP client available"); 548 this.tcpClient.connect(handler); 549 } 550 else if (StompCommand.DISCONNECT.equals(command)) { 551 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 552 if (handler == null) { 553 if (logger.isDebugEnabled()) { 554 logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up."); 555 } 556 return; 557 } 558 this.stats.incrementDisconnectCount(); 559 handler.forward(message, stompAccessor); 560 } 561 else { 562 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 563 if (handler == null) { 564 if (logger.isDebugEnabled()) { 565 logger.debug("No TCP connection for session " + sessionId + " in " + message); 566 } 567 return; 568 } 569 handler.forward(message, stompAccessor); 570 } 571 } 572 573 @Override 574 public String toString() { 575 return "StompBrokerRelay[" + getTcpClientInfo() + "]"; 576 } 577 578 private String getTcpClientInfo() { 579 return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort; 580 } 581 582 583 private class StompConnectionHandler implements TcpConnectionHandler<byte[]> { 584 585 private final String sessionId; 586 587 private final boolean isRemoteClientSession; 588 589 private final StompHeaderAccessor connectHeaders; 590 591 private final MessageChannel outboundChannel; 592 593 @Nullable 594 private volatile TcpConnection<byte[]> tcpConnection; 595 596 private volatile boolean isStompConnected; 597 598 599 protected StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { 600 this(sessionId, connectHeaders, true); 601 } 602 603 private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { 604 Assert.notNull(sessionId, "'sessionId' must not be null"); 605 Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); 606 this.sessionId = sessionId; 607 this.connectHeaders = connectHeaders; 608 this.isRemoteClientSession = isClientSession; 609 this.outboundChannel = getClientOutboundChannelForSession(sessionId); 610 } 611 612 public String getSessionId() { 613 return this.sessionId; 614 } 615 616 @Nullable 617 protected TcpConnection<byte[]> getTcpConnection() { 618 return this.tcpConnection; 619 } 620 621 @Override 622 public void afterConnected(TcpConnection<byte[]> connection) { 623 if (logger.isDebugEnabled()) { 624 logger.debug("TCP connection opened in session=" + getSessionId()); 625 } 626 this.tcpConnection = connection; 627 connection.onReadInactivity(() -> { 628 if (this.tcpConnection != null && !this.isStompConnected) { 629 handleTcpConnectionFailure("No CONNECTED frame received in " + 630 MAX_TIME_TO_CONNECTED_FRAME + " ms.", null); 631 } 632 }, MAX_TIME_TO_CONNECTED_FRAME); 633 connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders())); 634 } 635 636 @Override 637 public void afterConnectFailure(Throwable ex) { 638 handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex); 639 } 640 641 /** 642 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish 643 * the TCP connection, failure to send a message, missed heartbeat, etc. 644 */ 645 protected void handleTcpConnectionFailure(String error, @Nullable Throwable ex) { 646 if (logger.isInfoEnabled()) { 647 logger.info("TCP connection failure in session " + this.sessionId + ": " + error, ex); 648 } 649 try { 650 sendStompErrorFrameToClient(error); 651 } 652 finally { 653 try { 654 clearConnection(); 655 } 656 catch (Throwable ex2) { 657 if (logger.isDebugEnabled()) { 658 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2); 659 } 660 } 661 } 662 } 663 664 private void sendStompErrorFrameToClient(String errorText) { 665 if (this.isRemoteClientSession) { 666 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); 667 if (getHeaderInitializer() != null) { 668 getHeaderInitializer().initHeaders(accessor); 669 } 670 accessor.setSessionId(this.sessionId); 671 Principal user = this.connectHeaders.getUser(); 672 if (user != null) { 673 accessor.setUser(user); 674 } 675 accessor.setMessage(errorText); 676 accessor.setLeaveMutable(true); 677 Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); 678 handleInboundMessage(errorMessage); 679 } 680 } 681 682 protected void handleInboundMessage(Message<?> message) { 683 if (this.isRemoteClientSession) { 684 this.outboundChannel.send(message); 685 } 686 } 687 688 @Override 689 public void handleMessage(Message<byte[]> message) { 690 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 691 Assert.state(accessor != null, "No StompHeaderAccessor"); 692 accessor.setSessionId(this.sessionId); 693 Principal user = this.connectHeaders.getUser(); 694 if (user != null) { 695 accessor.setUser(user); 696 } 697 698 StompCommand command = accessor.getCommand(); 699 if (StompCommand.CONNECTED.equals(command)) { 700 if (logger.isDebugEnabled()) { 701 logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD)); 702 } 703 afterStompConnected(accessor); 704 } 705 else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) { 706 logger.error("Received " + accessor.getShortLogMessage(message.getPayload())); 707 } 708 else if (logger.isTraceEnabled()) { 709 logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload())); 710 } 711 712 handleInboundMessage(message); 713 } 714 715 /** 716 * Invoked after the STOMP CONNECTED frame is received. At this point the 717 * connection is ready for sending STOMP messages to the broker. 718 */ 719 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 720 this.isStompConnected = true; 721 stats.incrementConnectedCount(); 722 initHeartbeats(connectedHeaders); 723 } 724 725 private void initHeartbeats(StompHeaderAccessor connectedHeaders) { 726 if (this.isRemoteClientSession) { 727 return; 728 } 729 730 TcpConnection<byte[]> con = this.tcpConnection; 731 Assert.state(con != null, "No TcpConnection available"); 732 733 long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; 734 long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; 735 long serverSendInterval = connectedHeaders.getHeartbeat()[0]; 736 long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; 737 738 if (clientSendInterval > 0 && serverReceiveInterval > 0) { 739 long interval = Math.max(clientSendInterval, serverReceiveInterval); 740 con.onWriteInactivity(() -> 741 con.send(HEARTBEAT_MESSAGE).addCallback( 742 result -> {}, 743 ex -> handleTcpConnectionFailure( 744 "Failed to forward heartbeat: " + ex.getMessage(), ex)), interval); 745 } 746 if (clientReceiveInterval > 0 && serverSendInterval > 0) { 747 final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; 748 con.onReadInactivity( 749 () -> handleTcpConnectionFailure("No messages received in " + interval + " ms.", null), interval); 750 } 751 } 752 753 @Override 754 public void handleFailure(Throwable ex) { 755 if (this.tcpConnection != null) { 756 handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex); 757 } 758 else if (logger.isErrorEnabled()) { 759 logger.error("Transport failure: " + ex); 760 } 761 } 762 763 @Override 764 public void afterConnectionClosed() { 765 if (this.tcpConnection == null) { 766 return; 767 } 768 try { 769 if (logger.isDebugEnabled()) { 770 logger.debug("TCP connection to broker closed in session " + this.sessionId); 771 } 772 sendStompErrorFrameToClient("Connection to broker closed."); 773 } 774 finally { 775 try { 776 // Prevent clearConnection() from trying to close 777 this.tcpConnection = null; 778 clearConnection(); 779 } 780 catch (Throwable ex) { 781 // Shouldn't happen with connection reset beforehand 782 } 783 } 784 } 785 786 /** 787 * Forward the given message to the STOMP broker. 788 * <p>The method checks whether we have an active TCP connection and have 789 * received the STOMP CONNECTED frame. For client messages this should be 790 * false only if we lose the TCP connection around the same time when a 791 * client message is being forwarded, so we simply log the ignored message 792 * at debug level. For messages from within the application being sent on 793 * the "system" connection an exception is raised so that components sending 794 * the message have a chance to handle it -- by default the broker message 795 * channel is synchronous. 796 * <p>Note that if messages arrive concurrently around the same time a TCP 797 * connection is lost, there is a brief period of time before the connection 798 * is reset when one or more messages may sneak through and an attempt made 799 * to forward them. Rather than synchronizing to guard against that, this 800 * method simply lets them try and fail. For client sessions that may 801 * result in an additional STOMP ERROR frame(s) being sent downstream but 802 * code handling that downstream should be idempotent in such cases. 803 * @param message the message to send (never {@code null}) 804 * @return a future to wait for the result 805 */ 806 @SuppressWarnings("unchecked") 807 public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) { 808 TcpConnection<byte[]> conn = this.tcpConnection; 809 810 if (!this.isStompConnected || conn == null) { 811 if (this.isRemoteClientSession) { 812 if (logger.isDebugEnabled()) { 813 logger.debug("TCP connection closed already, ignoring " + 814 accessor.getShortLogMessage(message.getPayload())); 815 } 816 return EMPTY_TASK; 817 } 818 else { 819 throw new IllegalStateException("Cannot forward messages " + 820 (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") + 821 "Consider subscribing to receive BrokerAvailabilityEvent's from " + 822 "an ApplicationListener Spring bean. Dropped " + 823 accessor.getShortLogMessage(message.getPayload())); 824 } 825 } 826 827 final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ? 828 MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message; 829 830 StompCommand command = accessor.getCommand(); 831 if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) || 832 StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) { 833 logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload())); 834 } 835 else if (logger.isTraceEnabled()) { 836 logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload())); 837 } 838 839 ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend); 840 future.addCallback(new ListenableFutureCallback<Void>() { 841 @Override 842 public void onSuccess(@Nullable Void result) { 843 if (accessor.getCommand() == StompCommand.DISCONNECT) { 844 afterDisconnectSent(accessor); 845 } 846 } 847 @Override 848 public void onFailure(Throwable ex) { 849 if (tcpConnection != null) { 850 handleTcpConnectionFailure("failed to forward " + 851 accessor.getShortLogMessage(message.getPayload()), ex); 852 } 853 else if (logger.isErrorEnabled()) { 854 logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload())); 855 } 856 } 857 }); 858 return future; 859 } 860 861 /** 862 * After a DISCONNECT there should be no more client frames so we can 863 * close the connection pro-actively. However, if the DISCONNECT has a 864 * receipt header we leave the connection open and expect the server will 865 * respond with a RECEIPT and then close the connection. 866 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT"> 867 * STOMP Specification 1.2 DISCONNECT</a> 868 */ 869 private void afterDisconnectSent(StompHeaderAccessor accessor) { 870 if (accessor.getReceipt() == null) { 871 try { 872 clearConnection(); 873 } 874 catch (Throwable ex) { 875 if (logger.isDebugEnabled()) { 876 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex); 877 } 878 } 879 } 880 } 881 882 /** 883 * Clean up state associated with the connection and close it. 884 * Any exception arising from closing the connection are propagated. 885 */ 886 public void clearConnection() { 887 if (logger.isDebugEnabled()) { 888 logger.debug("Cleaning up connection state for session " + this.sessionId); 889 } 890 891 if (this.isRemoteClientSession) { 892 StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); 893 } 894 895 this.isStompConnected = false; 896 897 TcpConnection<byte[]> conn = this.tcpConnection; 898 this.tcpConnection = null; 899 if (conn != null) { 900 if (logger.isDebugEnabled()) { 901 logger.debug("Closing TCP connection in session " + this.sessionId); 902 } 903 conn.close(); 904 } 905 } 906 907 @Override 908 public String toString() { 909 return "StompConnectionHandler[sessionId=" + this.sessionId + "]"; 910 } 911 } 912 913 914 private class SystemStompConnectionHandler extends StompConnectionHandler { 915 916 public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { 917 super(SYSTEM_SESSION_ID, connectHeaders, false); 918 } 919 920 @Override 921 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 922 if (logger.isInfoEnabled()) { 923 logger.info("\"System\" session connected."); 924 } 925 super.afterStompConnected(connectedHeaders); 926 publishBrokerAvailableEvent(); 927 sendSystemSubscriptions(); 928 } 929 930 private void sendSystemSubscriptions() { 931 int i = 0; 932 for (String destination : getSystemSubscriptions().keySet()) { 933 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); 934 accessor.setSubscriptionId(String.valueOf(i++)); 935 accessor.setDestination(destination); 936 if (logger.isDebugEnabled()) { 937 logger.debug("Subscribing to " + destination + " on \"system\" connection."); 938 } 939 TcpConnection<byte[]> conn = getTcpConnection(); 940 if (conn != null) { 941 MessageHeaders headers = accessor.getMessageHeaders(); 942 conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback( 943 result -> {}, 944 ex -> { 945 String error = "Failed to subscribe in \"system\" session."; 946 handleTcpConnectionFailure(error, ex); 947 }); 948 } 949 } 950 } 951 952 @Override 953 protected void handleInboundMessage(Message<?> message) { 954 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 955 if (accessor != null && StompCommand.MESSAGE.equals(accessor.getCommand())) { 956 String destination = accessor.getDestination(); 957 if (destination == null) { 958 if (logger.isDebugEnabled()) { 959 logger.debug("Got message on \"system\" connection, with no destination: " + 960 accessor.getDetailedLogMessage(message.getPayload())); 961 } 962 return; 963 } 964 if (!getSystemSubscriptions().containsKey(destination)) { 965 if (logger.isDebugEnabled()) { 966 logger.debug("Got message on \"system\" connection with no handler: " + 967 accessor.getDetailedLogMessage(message.getPayload())); 968 } 969 return; 970 } 971 try { 972 MessageHandler handler = getSystemSubscriptions().get(destination); 973 handler.handleMessage(message); 974 } 975 catch (Throwable ex) { 976 if (logger.isDebugEnabled()) { 977 logger.debug("Error while handling message on \"system\" connection.", ex); 978 } 979 } 980 } 981 } 982 983 @Override 984 protected void handleTcpConnectionFailure(String errorMessage, @Nullable Throwable ex) { 985 super.handleTcpConnectionFailure(errorMessage, ex); 986 publishBrokerUnavailableEvent(); 987 } 988 989 @Override 990 public void afterConnectionClosed() { 991 super.afterConnectionClosed(); 992 publishBrokerUnavailableEvent(); 993 } 994 995 @Override 996 public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) { 997 try { 998 ListenableFuture<Void> future = super.forward(message, accessor); 999 if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) { 1000 future.get(); 1001 } 1002 return future; 1003 } 1004 catch (Throwable ex) { 1005 throw new MessageDeliveryException(message, ex); 1006 } 1007 } 1008 } 1009 1010 1011 private static class VoidCallable implements Callable<Void> { 1012 1013 @Override 1014 public Void call() { 1015 return null; 1016 } 1017 } 1018 1019 1020 /** 1021 * Contract for access to session counters. 1022 * @since 5.2 1023 */ 1024 public interface Stats { 1025 1026 /** 1027 * The number of connection handlers. 1028 */ 1029 int getTotalHandlers(); 1030 1031 /** 1032 * The number of CONNECT frames processed. 1033 */ 1034 int getTotalConnect(); 1035 1036 /** 1037 * The number of CONNECTED frames processed. 1038 */ 1039 int getTotalConnected(); 1040 1041 /** 1042 * The number of DISCONNECT frames processed. 1043 */ 1044 int getTotalDisconnect(); 1045 } 1046 1047 1048 private class DefaultStats implements Stats { 1049 1050 private final AtomicInteger connect = new AtomicInteger(); 1051 1052 private final AtomicInteger connected = new AtomicInteger(); 1053 1054 private final AtomicInteger disconnect = new AtomicInteger(); 1055 1056 public void incrementConnectCount() { 1057 this.connect.incrementAndGet(); 1058 } 1059 1060 public void incrementConnectedCount() { 1061 this.connected.incrementAndGet(); 1062 } 1063 1064 public void incrementDisconnectCount() { 1065 this.disconnect.incrementAndGet(); 1066 } 1067 1068 @Override 1069 public int getTotalHandlers() { 1070 return connectionHandlers.size(); 1071 } 1072 1073 @Override 1074 public int getTotalConnect() { 1075 return this.connect.get(); 1076 } 1077 1078 @Override 1079 public int getTotalConnected() { 1080 return this.connected.get(); 1081 } 1082 1083 @Override 1084 public int getTotalDisconnect() { 1085 return this.disconnect.get(); 1086 } 1087 1088 @Override 1089 public String toString() { 1090 return (connectionHandlers.size() + " sessions, " + getTcpClientInfo() + 1091 (isBrokerAvailable() ? " (available)" : " (not available)") + 1092 ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + 1093 this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); 1094 } 1095 } 1096 1097}