001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.util.Collection; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.concurrent.Callable; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.springframework.messaging.Message; 028import org.springframework.messaging.MessageChannel; 029import org.springframework.messaging.MessageDeliveryException; 030import org.springframework.messaging.MessageHandler; 031import org.springframework.messaging.MessageHeaders; 032import org.springframework.messaging.SubscribableChannel; 033import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 034import org.springframework.messaging.simp.SimpMessageType; 035import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler; 036import org.springframework.messaging.support.MessageBuilder; 037import org.springframework.messaging.support.MessageHeaderAccessor; 038import org.springframework.messaging.support.MessageHeaderInitializer; 039import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; 040import org.springframework.messaging.tcp.TcpConnection; 041import org.springframework.messaging.tcp.TcpConnectionHandler; 042import org.springframework.messaging.tcp.TcpOperations; 043import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; 044import org.springframework.util.Assert; 045import org.springframework.util.concurrent.ListenableFuture; 046import org.springframework.util.concurrent.ListenableFutureCallback; 047import org.springframework.util.concurrent.ListenableFutureTask; 048 049/** 050 * A {@link org.springframework.messaging.MessageHandler} that handles messages by 051 * forwarding them to a STOMP broker. 052 * 053 * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP 054 * connection to the broker is opened and used exclusively for all messages from the 055 * client that originated the CONNECT message. Messages from the same client are 056 * identified through the session id message header. Reversely, when the STOMP broker 057 * sends messages back on the TCP connection, those messages are enriched with the 058 * session id of the client and sent back downstream through the {@link MessageChannel} 059 * provided to the constructor. 060 * 061 * <p>This class also automatically opens a default "system" TCP connection to the 062 * message broker that is used for sending messages that originate from the server 063 * application (as opposed to from a client). Such messages are not associated with 064 * any client and therefore do not have a session id header. The "system" connection 065 * is effectively shared and cannot be used to receive messages. Several properties 066 * are provided to configure the "system" connection including: 067 * <ul> 068 * <li>{@link #setSystemLogin}</li> 069 * <li>{@link #setSystemPasscode}</li> 070 * <li>{@link #setSystemHeartbeatSendInterval}</li> 071 * <li>{@link #setSystemHeartbeatReceiveInterval}</li> 072 * </ul> 073 * 074 * @author Rossen Stoyanchev 075 * @author Andy Wilkinson 076 * @since 4.0 077 */ 078public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { 079 080 public static final String SYSTEM_SESSION_ID = "_system_"; 081 082 // STOMP recommends error of margin for receiving heartbeats 083 private static final long HEARTBEAT_MULTIPLIER = 3; 084 085 /** 086 * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings 087 * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively. 088 */ 089 private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; 090 091 private static final byte[] EMPTY_PAYLOAD = new byte[0]; 092 093 private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable()); 094 095 private static final Message<byte[]> HEARTBEAT_MESSAGE; 096 097 098 static { 099 EMPTY_TASK.run(); 100 StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat(); 101 HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders()); 102 } 103 104 105 private String relayHost = "127.0.0.1"; 106 107 private int relayPort = 61613; 108 109 private String clientLogin = "guest"; 110 111 private String clientPasscode = "guest"; 112 113 private String systemLogin = "guest"; 114 115 private String systemPasscode = "guest"; 116 117 private long systemHeartbeatSendInterval = 10000; 118 119 private long systemHeartbeatReceiveInterval = 10000; 120 121 private final Map<String, MessageHandler> systemSubscriptions = new HashMap<String, MessageHandler>(4); 122 123 private String virtualHost; 124 125 private TcpOperations<byte[]> tcpClient; 126 127 private MessageHeaderInitializer headerInitializer; 128 129 private final Stats stats = new Stats(); 130 131 private final Map<String, StompConnectionHandler> connectionHandlers = 132 new ConcurrentHashMap<String, StompConnectionHandler>(); 133 134 135 /** 136 * Create a StompBrokerRelayMessageHandler instance with the given message channels 137 * and destination prefixes. 138 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 139 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 140 * @param brokerChannel the channel for the application to send messages to the broker 141 * @param destinationPrefixes the broker supported destination prefixes; destinations 142 * that do not match the given prefix are ignored. 143 */ 144 public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 145 SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) { 146 147 super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes); 148 } 149 150 151 /** 152 * Set the STOMP message broker host. 153 */ 154 public void setRelayHost(String relayHost) { 155 Assert.hasText(relayHost, "relayHost must not be empty"); 156 this.relayHost = relayHost; 157 } 158 159 /** 160 * Return the STOMP message broker host. 161 */ 162 public String getRelayHost() { 163 return this.relayHost; 164 } 165 166 /** 167 * Set the STOMP message broker port. 168 */ 169 public void setRelayPort(int relayPort) { 170 this.relayPort = relayPort; 171 } 172 173 /** 174 * Return the STOMP message broker port. 175 */ 176 public int getRelayPort() { 177 return this.relayPort; 178 } 179 /** 180 * Set the login to use when creating connections to the STOMP broker on 181 * behalf of connected clients. 182 * <p>By default this is set to "guest". 183 * @see #setSystemLogin(String) 184 */ 185 public void setClientLogin(String clientLogin) { 186 Assert.hasText(clientLogin, "clientLogin must not be empty"); 187 this.clientLogin = clientLogin; 188 } 189 190 /** 191 * Return the configured login to use for connections to the STOMP broker 192 * on behalf of connected clients. 193 * @see #getSystemLogin() 194 */ 195 public String getClientLogin() { 196 return this.clientLogin; 197 } 198 199 /** 200 * Set the client passcode to use to create connections to the STOMP broker on 201 * behalf of connected clients. 202 * <p>By default this is set to "guest". 203 * @see #setSystemPasscode 204 */ 205 public void setClientPasscode(String clientPasscode) { 206 Assert.hasText(clientPasscode, "clientPasscode must not be empty"); 207 this.clientPasscode = clientPasscode; 208 } 209 210 /** 211 * Return the configured passcode to use for connections to the STOMP broker on 212 * behalf of connected clients. 213 * @see #getSystemPasscode() 214 */ 215 public String getClientPasscode() { 216 return this.clientPasscode; 217 } 218 219 /** 220 * Set the login for the shared "system" connection used to send messages to 221 * the STOMP broker from within the application, i.e. messages not associated 222 * with a specific client session (e.g. REST/HTTP request handling method). 223 * <p>By default this is set to "guest". 224 */ 225 public void setSystemLogin(String systemLogin) { 226 Assert.hasText(systemLogin, "systemLogin must not be empty"); 227 this.systemLogin = systemLogin; 228 } 229 230 /** 231 * Return the login used for the shared "system" connection to the STOMP broker. 232 */ 233 public String getSystemLogin() { 234 return this.systemLogin; 235 } 236 237 /** 238 * Set the passcode for the shared "system" connection used to send messages to 239 * the STOMP broker from within the application, i.e. messages not associated 240 * with a specific client session (e.g. REST/HTTP request handling method). 241 * <p>By default this is set to "guest". 242 */ 243 public void setSystemPasscode(String systemPasscode) { 244 this.systemPasscode = systemPasscode; 245 } 246 247 /** 248 * Return the passcode used for the shared "system" connection to the STOMP broker. 249 */ 250 public String getSystemPasscode() { 251 return this.systemPasscode; 252 } 253 254 255 /** 256 * Set the interval, in milliseconds, at which the "system" connection will, in the 257 * absence of any other data being sent, send a heartbeat to the STOMP broker. A value 258 * of zero will prevent heartbeats from being sent to the broker. 259 * <p>The default value is 10000. 260 * <p>See class-level documentation for more information on the "system" connection. 261 */ 262 public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { 263 this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; 264 } 265 266 /** 267 * Return the interval, in milliseconds, at which the "system" connection will 268 * send heartbeats to the STOMP broker. 269 */ 270 public long getSystemHeartbeatSendInterval() { 271 return this.systemHeartbeatSendInterval; 272 } 273 274 /** 275 * Set the maximum interval, in milliseconds, at which the "system" connection 276 * expects, in the absence of any other data, to receive a heartbeat from the STOMP 277 * broker. A value of zero will configure the connection to expect not to receive 278 * heartbeats from the broker. 279 * <p>The default value is 10000. 280 * <p>See class-level documentation for more information on the "system" connection. 281 */ 282 public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { 283 this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; 284 } 285 286 /** 287 * Return the interval, in milliseconds, at which the "system" connection expects 288 * to receive heartbeats from the STOMP broker. 289 */ 290 public long getSystemHeartbeatReceiveInterval() { 291 return this.systemHeartbeatReceiveInterval; 292 } 293 294 /** 295 * Configure one more destinations to subscribe to on the shared "system" 296 * connection along with MessageHandler's to handle received messages. 297 * <p>This is for internal use in a multi-application server scenario where 298 * servers forward messages to each other (e.g. unresolved user destinations). 299 * @param subscriptions the destinations to subscribe to. 300 */ 301 public void setSystemSubscriptions(Map<String, MessageHandler> subscriptions) { 302 this.systemSubscriptions.clear(); 303 if (subscriptions != null) { 304 this.systemSubscriptions.putAll(subscriptions); 305 } 306 } 307 308 /** 309 * Return the configured map with subscriptions on the "system" connection. 310 */ 311 public Map<String, MessageHandler> getSystemSubscriptions() { 312 return this.systemSubscriptions; 313 } 314 315 /** 316 * Set the value of the "host" header to use in STOMP CONNECT frames. When this 317 * property is configured, a "host" header will be added to every STOMP frame sent to 318 * the STOMP broker. This may be useful for example in a cloud environment where the 319 * actual host to which the TCP connection is established is different from the host 320 * providing the cloud-based STOMP service. 321 * <p>By default this property is not set. 322 */ 323 public void setVirtualHost(String virtualHost) { 324 this.virtualHost = virtualHost; 325 } 326 327 /** 328 * Return the configured virtual host value. 329 */ 330 public String getVirtualHost() { 331 return this.virtualHost; 332 } 333 334 /** 335 * Configure a TCP client for managing TCP connections to the STOMP broker. 336 * By default {@link Reactor2TcpClient} is used. 337 * <p><strong>Note:</strong> when this property is used, any 338 * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} 339 * specified are effectively ignored. 340 */ 341 public void setTcpClient(TcpOperations<byte[]> tcpClient) { 342 this.tcpClient = tcpClient; 343 } 344 345 /** 346 * Get the configured TCP client (never {@code null} unless not configured 347 * invoked and this method is invoked before the handler is started and 348 * hence a default implementation initialized). 349 */ 350 public TcpOperations<byte[]> getTcpClient() { 351 return this.tcpClient; 352 } 353 354 /** 355 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 356 * messages created through the {@code StompBrokerRelayMessageHandler} that 357 * are sent to the client outbound message channel. 358 * <p>By default this property is not set. 359 */ 360 public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 361 this.headerInitializer = headerInitializer; 362 } 363 364 /** 365 * Return the configured header initializer. 366 */ 367 public MessageHeaderInitializer getHeaderInitializer() { 368 return this.headerInitializer; 369 } 370 371 /** 372 * Return a String describing internal state and counters. 373 */ 374 public String getStatsInfo() { 375 return this.stats.toString(); 376 } 377 378 /** 379 * Return the current count of TCP connection to the broker. 380 */ 381 public int getConnectionCount() { 382 return this.connectionHandlers.size(); 383 } 384 385 386 @Override 387 protected void startInternal() { 388 if (this.tcpClient == null) { 389 StompDecoder decoder = new StompDecoder(); 390 decoder.setHeaderInitializer(getHeaderInitializer()); 391 Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder); 392 this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); 393 } 394 395 if (logger.isInfoEnabled()) { 396 logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort); 397 } 398 399 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT); 400 accessor.setAcceptVersion("1.1,1.2"); 401 accessor.setLogin(this.systemLogin); 402 accessor.setPasscode(this.systemPasscode); 403 accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval); 404 accessor.setHost(getVirtualHost()); 405 accessor.setSessionId(SYSTEM_SESSION_ID); 406 if (logger.isDebugEnabled()) { 407 logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD)); 408 } 409 410 SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor); 411 this.connectionHandlers.put(handler.getSessionId(), handler); 412 413 this.stats.incrementConnectCount(); 414 this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); 415 } 416 417 @Override 418 protected void stopInternal() { 419 publishBrokerUnavailableEvent(); 420 try { 421 this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); 422 } 423 catch (Throwable ex) { 424 logger.error("Error in shutdown of TCP client", ex); 425 } 426 } 427 428 @Override 429 protected void handleMessageInternal(Message<?> message) { 430 String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders()); 431 432 if (!isBrokerAvailable()) { 433 if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) { 434 throw new MessageDeliveryException("Message broker not active. Consider subscribing to " + 435 "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); 436 } 437 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 438 if (handler != null) { 439 handler.sendStompErrorFrameToClient("Broker not available."); 440 handler.clearConnection(); 441 } 442 else { 443 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); 444 if (getHeaderInitializer() != null) { 445 getHeaderInitializer().initHeaders(accessor); 446 } 447 accessor.setSessionId(sessionId); 448 accessor.setUser(SimpMessageHeaderAccessor.getUser(message.getHeaders())); 449 accessor.setMessage("Broker not available."); 450 MessageHeaders headers = accessor.getMessageHeaders(); 451 getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)); 452 } 453 return; 454 } 455 456 StompHeaderAccessor stompAccessor; 457 StompCommand command; 458 459 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 460 if (accessor == null) { 461 throw new IllegalStateException( 462 "No header accessor (not using the SimpMessagingTemplate?): " + message); 463 } 464 else if (accessor instanceof StompHeaderAccessor) { 465 stompAccessor = (StompHeaderAccessor) accessor; 466 command = stompAccessor.getCommand(); 467 } 468 else if (accessor instanceof SimpMessageHeaderAccessor) { 469 stompAccessor = StompHeaderAccessor.wrap(message); 470 command = stompAccessor.getCommand(); 471 if (command == null) { 472 command = stompAccessor.updateStompCommandAsClientMessage(); 473 } 474 } 475 else { 476 throw new IllegalStateException( 477 "Unexpected header accessor type " + accessor.getClass() + " in " + message); 478 } 479 480 if (sessionId == null) { 481 if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) { 482 if (logger.isErrorEnabled()) { 483 logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message); 484 } 485 return; 486 } 487 sessionId = SYSTEM_SESSION_ID; 488 stompAccessor.setSessionId(sessionId); 489 } 490 491 String destination = stompAccessor.getDestination(); 492 if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) { 493 return; 494 } 495 496 if (StompCommand.CONNECT.equals(command)) { 497 if (logger.isDebugEnabled()) { 498 logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD)); 499 } 500 stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message)); 501 stompAccessor.setLogin(this.clientLogin); 502 stompAccessor.setPasscode(this.clientPasscode); 503 if (getVirtualHost() != null) { 504 stompAccessor.setHost(getVirtualHost()); 505 } 506 StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); 507 this.connectionHandlers.put(sessionId, handler); 508 this.stats.incrementConnectCount(); 509 this.tcpClient.connect(handler); 510 } 511 else if (StompCommand.DISCONNECT.equals(command)) { 512 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 513 if (handler == null) { 514 if (logger.isDebugEnabled()) { 515 logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up."); 516 } 517 return; 518 } 519 stats.incrementDisconnectCount(); 520 handler.forward(message, stompAccessor); 521 } 522 else { 523 StompConnectionHandler handler = this.connectionHandlers.get(sessionId); 524 if (handler == null) { 525 if (logger.isDebugEnabled()) { 526 logger.debug("No TCP connection for session " + sessionId + " in " + message); 527 } 528 return; 529 } 530 handler.forward(message, stompAccessor); 531 } 532 } 533 534 @Override 535 public String toString() { 536 return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]"; 537 } 538 539 540 private class StompConnectionHandler implements TcpConnectionHandler<byte[]> { 541 542 private final String sessionId; 543 544 private final boolean isRemoteClientSession; 545 546 private final StompHeaderAccessor connectHeaders; 547 548 private volatile TcpConnection<byte[]> tcpConnection; 549 550 private volatile boolean isStompConnected; 551 552 553 private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { 554 this(sessionId, connectHeaders, true); 555 } 556 557 private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { 558 Assert.notNull(sessionId, "'sessionId' must not be null"); 559 Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); 560 this.sessionId = sessionId; 561 this.connectHeaders = connectHeaders; 562 this.isRemoteClientSession = isClientSession; 563 } 564 565 public String getSessionId() { 566 return this.sessionId; 567 } 568 569 protected TcpConnection<byte[]> getTcpConnection() { 570 return this.tcpConnection; 571 } 572 573 @Override 574 public void afterConnected(TcpConnection<byte[]> connection) { 575 if (logger.isDebugEnabled()) { 576 logger.debug("TCP connection opened in session=" + getSessionId()); 577 } 578 this.tcpConnection = connection; 579 this.tcpConnection.onReadInactivity(new Runnable() { 580 @Override 581 public void run() { 582 if (tcpConnection != null && !isStompConnected) { 583 handleTcpConnectionFailure("No CONNECTED frame received in " + 584 MAX_TIME_TO_CONNECTED_FRAME + " ms.", null); 585 } 586 } 587 }, MAX_TIME_TO_CONNECTED_FRAME); 588 connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders())); 589 } 590 591 @Override 592 public void afterConnectFailure(Throwable ex) { 593 handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex); 594 } 595 596 /** 597 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish 598 * the TCP connection, failure to send a message, missed heartbeat, etc. 599 */ 600 protected void handleTcpConnectionFailure(String error, Throwable ex) { 601 if (logger.isErrorEnabled()) { 602 logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex); 603 } 604 try { 605 sendStompErrorFrameToClient(error); 606 } 607 finally { 608 try { 609 clearConnection(); 610 } 611 catch (Throwable ex2) { 612 if (logger.isDebugEnabled()) { 613 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2); 614 } 615 } 616 } 617 } 618 619 private void sendStompErrorFrameToClient(String errorText) { 620 if (this.isRemoteClientSession) { 621 StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); 622 if (getHeaderInitializer() != null) { 623 getHeaderInitializer().initHeaders(headerAccessor); 624 } 625 headerAccessor.setSessionId(this.sessionId); 626 headerAccessor.setUser(this.connectHeaders.getUser()); 627 headerAccessor.setMessage(errorText); 628 Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders()); 629 handleInboundMessage(errorMessage); 630 } 631 } 632 633 protected void handleInboundMessage(Message<?> message) { 634 if (this.isRemoteClientSession) { 635 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 636 accessor.setImmutable(); 637 StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message); 638 } 639 } 640 641 @Override 642 public void handleMessage(Message<byte[]> message) { 643 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 644 accessor.setSessionId(this.sessionId); 645 accessor.setUser(this.connectHeaders.getUser()); 646 647 StompCommand command = accessor.getCommand(); 648 if (StompCommand.CONNECTED.equals(command)) { 649 if (logger.isDebugEnabled()) { 650 logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD)); 651 } 652 afterStompConnected(accessor); 653 } 654 else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) { 655 logger.error("Received " + accessor.getShortLogMessage(message.getPayload())); 656 } 657 else if (logger.isTraceEnabled()) { 658 logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload())); 659 } 660 661 handleInboundMessage(message); 662 } 663 664 /** 665 * Invoked after the STOMP CONNECTED frame is received. At this point the 666 * connection is ready for sending STOMP messages to the broker. 667 */ 668 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 669 this.isStompConnected = true; 670 stats.incrementConnectedCount(); 671 initHeartbeats(connectedHeaders); 672 } 673 674 private void initHeartbeats(StompHeaderAccessor connectedHeaders) { 675 if (this.isRemoteClientSession) { 676 return; 677 } 678 679 long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; 680 long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; 681 long serverSendInterval = connectedHeaders.getHeartbeat()[0]; 682 long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; 683 684 if (clientSendInterval > 0 && serverReceiveInterval > 0) { 685 long interval = Math.max(clientSendInterval, serverReceiveInterval); 686 this.tcpConnection.onWriteInactivity(new Runnable() { 687 @Override 688 public void run() { 689 TcpConnection<byte[]> conn = tcpConnection; 690 if (conn != null) { 691 conn.send(HEARTBEAT_MESSAGE).addCallback( 692 new ListenableFutureCallback<Void>() { 693 public void onSuccess(Void result) { 694 } 695 public void onFailure(Throwable ex) { 696 handleTcpConnectionFailure( 697 "Failed to forward heartbeat: " + ex.getMessage(), ex); 698 } 699 }); 700 } 701 } 702 }, interval); 703 } 704 if (clientReceiveInterval > 0 && serverSendInterval > 0) { 705 final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; 706 this.tcpConnection.onReadInactivity(new Runnable() { 707 @Override 708 public void run() { 709 handleTcpConnectionFailure("No messages received in " + interval + " ms.", null); 710 } 711 }, interval); 712 } 713 } 714 715 @Override 716 public void handleFailure(Throwable ex) { 717 if (this.tcpConnection != null) { 718 handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex); 719 } 720 else if (logger.isErrorEnabled()) { 721 logger.error("Transport failure: " + ex); 722 } 723 } 724 725 @Override 726 public void afterConnectionClosed() { 727 if (this.tcpConnection == null) { 728 return; 729 } 730 try { 731 if (logger.isDebugEnabled()) { 732 logger.debug("TCP connection to broker closed in session " + this.sessionId); 733 } 734 sendStompErrorFrameToClient("Connection to broker closed."); 735 } 736 finally { 737 try { 738 // Prevent clearConnection() from trying to close 739 this.tcpConnection = null; 740 clearConnection(); 741 } 742 catch (Throwable ex) { 743 // Shouldn't happen with connection reset beforehand 744 } 745 } 746 } 747 748 /** 749 * Forward the given message to the STOMP broker. 750 * <p>The method checks whether we have an active TCP connection and have 751 * received the STOMP CONNECTED frame. For client messages this should be 752 * false only if we lose the TCP connection around the same time when a 753 * client message is being forwarded, so we simply log the ignored message 754 * at debug level. For messages from within the application being sent on 755 * the "system" connection an exception is raised so that components sending 756 * the message have a chance to handle it -- by default the broker message 757 * channel is synchronous. 758 * <p>Note that if messages arrive concurrently around the same time a TCP 759 * connection is lost, there is a brief period of time before the connection 760 * is reset when one or more messages may sneak through and an attempt made 761 * to forward them. Rather than synchronizing to guard against that, this 762 * method simply lets them try and fail. For client sessions that may 763 * result in an additional STOMP ERROR frame(s) being sent downstream but 764 * code handling that downstream should be idempotent in such cases. 765 * @param message the message to send (never {@code null}) 766 * @return a future to wait for the result 767 */ 768 @SuppressWarnings("unchecked") 769 public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) { 770 TcpConnection<byte[]> conn = this.tcpConnection; 771 772 if (!this.isStompConnected || conn == null) { 773 if (this.isRemoteClientSession) { 774 if (logger.isDebugEnabled()) { 775 logger.debug("TCP connection closed already, ignoring " + 776 accessor.getShortLogMessage(message.getPayload())); 777 } 778 return EMPTY_TASK; 779 } 780 else { 781 throw new IllegalStateException("Cannot forward messages " + 782 (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") + 783 "Consider subscribing to receive BrokerAvailabilityEvent's from " + 784 "an ApplicationListener Spring bean. Dropped " + 785 accessor.getShortLogMessage(message.getPayload())); 786 } 787 } 788 789 final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ? 790 MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message; 791 792 StompCommand command = accessor.getCommand(); 793 if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) || 794 StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) { 795 logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload())); 796 } 797 else if (logger.isTraceEnabled()) { 798 logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload())); 799 } 800 801 ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend); 802 future.addCallback(new ListenableFutureCallback<Void>() { 803 @Override 804 public void onSuccess(Void result) { 805 if (accessor.getCommand() == StompCommand.DISCONNECT) { 806 afterDisconnectSent(accessor); 807 } 808 } 809 @Override 810 public void onFailure(Throwable ex) { 811 if (tcpConnection != null) { 812 handleTcpConnectionFailure("failed to forward " + 813 accessor.getShortLogMessage(message.getPayload()), ex); 814 } 815 else if (logger.isErrorEnabled()) { 816 logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload())); 817 } 818 } 819 }); 820 return future; 821 } 822 823 /** 824 * After a DISCONNECT there should be no more client frames so we can 825 * close the connection pro-actively. However, if the DISCONNECT has a 826 * receipt header we leave the connection open and expect the server will 827 * respond with a RECEIPT and then close the connection. 828 * @see <a href="https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT"> 829 * STOMP Specification 1.2 DISCONNECT</a> 830 */ 831 private void afterDisconnectSent(StompHeaderAccessor accessor) { 832 if (accessor.getReceipt() == null) { 833 try { 834 clearConnection(); 835 } 836 catch (Throwable ex) { 837 if (logger.isDebugEnabled()) { 838 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex); 839 } 840 } 841 } 842 } 843 844 /** 845 * Clean up state associated with the connection and close it. 846 * Any exception arising from closing the connection are propagated. 847 */ 848 public void clearConnection() { 849 if (logger.isDebugEnabled()) { 850 logger.debug("Cleaning up connection state for session " + this.sessionId); 851 } 852 853 if (this.isRemoteClientSession) { 854 StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); 855 } 856 857 this.isStompConnected = false; 858 859 TcpConnection<byte[]> conn = this.tcpConnection; 860 this.tcpConnection = null; 861 if (conn != null) { 862 if (logger.isDebugEnabled()) { 863 logger.debug("Closing TCP connection in session " + this.sessionId); 864 } 865 conn.close(); 866 } 867 } 868 869 @Override 870 public String toString() { 871 return "StompConnectionHandler[sessionId=" + this.sessionId + "]"; 872 } 873 } 874 875 876 private class SystemStompConnectionHandler extends StompConnectionHandler { 877 878 public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { 879 super(SYSTEM_SESSION_ID, connectHeaders, false); 880 } 881 882 @Override 883 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { 884 if (logger.isInfoEnabled()) { 885 logger.info("\"System\" session connected."); 886 } 887 super.afterStompConnected(connectedHeaders); 888 publishBrokerAvailableEvent(); 889 sendSystemSubscriptions(); 890 } 891 892 private void sendSystemSubscriptions() { 893 int i = 0; 894 for (String destination : getSystemSubscriptions().keySet()) { 895 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); 896 accessor.setSubscriptionId(String.valueOf(i++)); 897 accessor.setDestination(destination); 898 if (logger.isDebugEnabled()) { 899 logger.debug("Subscribing to " + destination + " on \"system\" connection."); 900 } 901 TcpConnection<byte[]> conn = getTcpConnection(); 902 if (conn != null) { 903 MessageHeaders headers = accessor.getMessageHeaders(); 904 conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback( 905 new ListenableFutureCallback<Void>() { 906 public void onSuccess(Void result) { 907 } 908 public void onFailure(Throwable ex) { 909 String error = "Failed to subscribe in \"system\" session."; 910 handleTcpConnectionFailure(error, ex); 911 } 912 }); 913 } 914 } 915 } 916 917 @Override 918 protected void handleInboundMessage(Message<?> message) { 919 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 920 if (StompCommand.MESSAGE.equals(accessor.getCommand())) { 921 String destination = accessor.getDestination(); 922 if (destination == null) { 923 if (logger.isDebugEnabled()) { 924 logger.debug("Got message on \"system\" connection, with no destination: " + 925 accessor.getDetailedLogMessage(message.getPayload())); 926 } 927 return; 928 } 929 if (!getSystemSubscriptions().containsKey(destination)) { 930 if (logger.isDebugEnabled()) { 931 logger.debug("Got message on \"system\" connection with no handler: " + 932 accessor.getDetailedLogMessage(message.getPayload())); 933 } 934 return; 935 } 936 try { 937 MessageHandler handler = getSystemSubscriptions().get(destination); 938 handler.handleMessage(message); 939 } 940 catch (Throwable ex) { 941 if (logger.isDebugEnabled()) { 942 logger.debug("Error while handling message on \"system\" connection.", ex); 943 } 944 } 945 } 946 } 947 948 @Override 949 protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { 950 super.handleTcpConnectionFailure(errorMessage, ex); 951 publishBrokerUnavailableEvent(); 952 } 953 954 @Override 955 public void afterConnectionClosed() { 956 super.afterConnectionClosed(); 957 publishBrokerUnavailableEvent(); 958 } 959 960 @Override 961 public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) { 962 try { 963 ListenableFuture<Void> future = super.forward(message, accessor); 964 if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) { 965 future.get(); 966 } 967 return future; 968 } 969 catch (Throwable ex) { 970 throw new MessageDeliveryException(message, ex); 971 } 972 } 973 } 974 975 976 private static class StompTcpClientFactory { 977 978 public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor2StompCodec codec) { 979 return new Reactor2TcpClient<byte[]>(relayHost, relayPort, codec); 980 } 981 } 982 983 984 private static class VoidCallable implements Callable<Void> { 985 986 @Override 987 public Void call() throws Exception { 988 return null; 989 } 990 } 991 992 993 private class Stats { 994 995 private final AtomicInteger connect = new AtomicInteger(); 996 997 private final AtomicInteger connected = new AtomicInteger(); 998 999 private final AtomicInteger disconnect = new AtomicInteger(); 1000 1001 public void incrementConnectCount() { 1002 this.connect.incrementAndGet(); 1003 } 1004 1005 public void incrementConnectedCount() { 1006 this.connected.incrementAndGet(); 1007 } 1008 1009 public void incrementDisconnectCount() { 1010 this.disconnect.incrementAndGet(); 1011 } 1012 1013 public String toString() { 1014 return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + 1015 (isBrokerAvailable() ? " (available)" : " (not available)") + 1016 ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + 1017 this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); 1018 } 1019 } 1020 1021}