001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.messaging; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021import java.security.Principal; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.context.ApplicationEvent; 033import org.springframework.context.ApplicationEventPublisher; 034import org.springframework.context.ApplicationEventPublisherAware; 035import org.springframework.messaging.Message; 036import org.springframework.messaging.MessageChannel; 037import org.springframework.messaging.simp.SimpAttributes; 038import org.springframework.messaging.simp.SimpAttributesContextHolder; 039import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 040import org.springframework.messaging.simp.SimpMessageType; 041import org.springframework.messaging.simp.stomp.BufferingStompDecoder; 042import org.springframework.messaging.simp.stomp.StompCommand; 043import org.springframework.messaging.simp.stomp.StompDecoder; 044import org.springframework.messaging.simp.stomp.StompEncoder; 045import org.springframework.messaging.simp.stomp.StompHeaderAccessor; 046import org.springframework.messaging.simp.user.DestinationUserNameProvider; 047import org.springframework.messaging.support.AbstractMessageChannel; 048import org.springframework.messaging.support.ChannelInterceptor; 049import org.springframework.messaging.support.ImmutableMessageChannelInterceptor; 050import org.springframework.messaging.support.MessageBuilder; 051import org.springframework.messaging.support.MessageHeaderAccessor; 052import org.springframework.messaging.support.MessageHeaderInitializer; 053import org.springframework.util.Assert; 054import org.springframework.util.MimeTypeUtils; 055import org.springframework.web.socket.BinaryMessage; 056import org.springframework.web.socket.CloseStatus; 057import org.springframework.web.socket.TextMessage; 058import org.springframework.web.socket.WebSocketMessage; 059import org.springframework.web.socket.WebSocketSession; 060import org.springframework.web.socket.handler.SessionLimitExceededException; 061import org.springframework.web.socket.handler.WebSocketSessionDecorator; 062import org.springframework.web.socket.sockjs.transport.SockJsSession; 063 064/** 065 * A {@link SubProtocolHandler} for STOMP that supports versions 1.0, 1.1, and 1.2 066 * of the STOMP specification. 067 * 068 * @author Rossen Stoyanchev 069 * @author Andy Wilkinson 070 * @since 4.0 071 */ 072public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationEventPublisherAware { 073 074 /** 075 * This handler supports assembling large STOMP messages split into multiple 076 * WebSocket messages and STOMP clients (like stomp.js) indeed split large STOMP 077 * messages at 16K boundaries. Therefore the WebSocket server input message 078 * buffer size must allow 16K at least plus a little extra for SockJS framing. 079 */ 080 public static final int MINIMUM_WEBSOCKET_MESSAGE_SIZE = 16 * 1024 + 256; 081 082 /** 083 * The name of the header set on the CONNECTED frame indicating the name 084 * of the user authenticated on the WebSocket session. 085 */ 086 public static final String CONNECTED_USER_HEADER = "user-name"; 087 088 private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class); 089 090 private static final byte[] EMPTY_PAYLOAD = new byte[0]; 091 092 093 private StompSubProtocolErrorHandler errorHandler; 094 095 private int messageSizeLimit = 64 * 1024; 096 097 @SuppressWarnings("deprecation") 098 private org.springframework.messaging.simp.user.UserSessionRegistry userSessionRegistry; 099 100 private StompEncoder stompEncoder = new StompEncoder(); 101 102 private StompDecoder stompDecoder = new StompDecoder(); 103 104 private final Map<String, BufferingStompDecoder> decoders = new ConcurrentHashMap<String, BufferingStompDecoder>(); 105 106 private MessageHeaderInitializer headerInitializer; 107 108 private final Map<String, Principal> stompAuthentications = new ConcurrentHashMap<String, Principal>(); 109 110 private Boolean immutableMessageInterceptorPresent; 111 112 private ApplicationEventPublisher eventPublisher; 113 114 private final Stats stats = new Stats(); 115 116 117 /** 118 * Configure a handler for error messages sent to clients which allows 119 * customizing the error messages or preventing them from being sent. 120 * <p>By default this isn't configured in which case an ERROR frame is sent 121 * with a message header reflecting the error. 122 * @param errorHandler the error handler 123 */ 124 public void setErrorHandler(StompSubProtocolErrorHandler errorHandler) { 125 this.errorHandler = errorHandler; 126 } 127 128 /** 129 * Return the configured error handler. 130 */ 131 public StompSubProtocolErrorHandler getErrorHandler() { 132 return this.errorHandler; 133 } 134 135 /** 136 * Configure the maximum size allowed for an incoming STOMP message. 137 * Since a STOMP message can be received in multiple WebSocket messages, 138 * buffering may be required and therefore it is necessary to know the maximum 139 * allowed message size. 140 * <p>By default this property is set to 64K. 141 * @since 4.0.3 142 */ 143 public void setMessageSizeLimit(int messageSizeLimit) { 144 this.messageSizeLimit = messageSizeLimit; 145 } 146 147 /** 148 * Get the configured message buffer size limit in bytes. 149 * @since 4.0.3 150 */ 151 public int getMessageSizeLimit() { 152 return this.messageSizeLimit; 153 } 154 155 /** 156 * Provide a registry with which to register active user session ids. 157 * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler 158 * @deprecated as of 4.2 in favor of {@link DefaultSimpUserRegistry} which relies 159 * on the ApplicationContext events published by this class and is created via 160 * {@link org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport#createLocalUserRegistry 161 * WebSocketMessageBrokerConfigurationSupport.createLocalUserRegistry} 162 */ 163 @Deprecated 164 public void setUserSessionRegistry(org.springframework.messaging.simp.user.UserSessionRegistry registry) { 165 this.userSessionRegistry = registry; 166 } 167 168 /** 169 * @deprecated as of 4.2 170 */ 171 @Deprecated 172 public org.springframework.messaging.simp.user.UserSessionRegistry getUserSessionRegistry() { 173 return this.userSessionRegistry; 174 } 175 176 /** 177 * Configure a {@link StompEncoder} for encoding STOMP frames 178 * @since 4.3.5 179 */ 180 public void setEncoder(StompEncoder encoder) { 181 this.stompEncoder = encoder; 182 } 183 184 /** 185 * Configure a {@link StompDecoder} for decoding STOMP frames 186 * @since 4.3.5 187 */ 188 public void setDecoder(StompDecoder decoder) { 189 this.stompDecoder = decoder; 190 } 191 192 /** 193 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all 194 * messages created from decoded STOMP frames and other messages sent to the 195 * client inbound channel. 196 * <p>By default this property is not set. 197 */ 198 public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { 199 this.headerInitializer = headerInitializer; 200 this.stompDecoder.setHeaderInitializer(headerInitializer); 201 } 202 203 /** 204 * Return the configured header initializer. 205 */ 206 public MessageHeaderInitializer getHeaderInitializer() { 207 return this.headerInitializer; 208 } 209 210 @Override 211 public List<String> getSupportedProtocols() { 212 return Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp"); 213 } 214 215 @Override 216 public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { 217 this.eventPublisher = applicationEventPublisher; 218 } 219 220 /** 221 * Return a String describing internal state and counters. 222 */ 223 public String getStatsInfo() { 224 return this.stats.toString(); 225 } 226 227 228 /** 229 * Handle incoming WebSocket messages from clients. 230 */ 231 public void handleMessageFromClient(WebSocketSession session, 232 WebSocketMessage<?> webSocketMessage, MessageChannel outputChannel) { 233 234 List<Message<byte[]>> messages; 235 try { 236 ByteBuffer byteBuffer; 237 if (webSocketMessage instanceof TextMessage) { 238 byteBuffer = ByteBuffer.wrap(((TextMessage) webSocketMessage).asBytes()); 239 } 240 else if (webSocketMessage instanceof BinaryMessage) { 241 byteBuffer = ((BinaryMessage) webSocketMessage).getPayload(); 242 } 243 else { 244 return; 245 } 246 247 BufferingStompDecoder decoder = this.decoders.get(session.getId()); 248 if (decoder == null) { 249 throw new IllegalStateException("No decoder for session id '" + session.getId() + "'"); 250 } 251 252 messages = decoder.decode(byteBuffer); 253 if (messages.isEmpty()) { 254 if (logger.isTraceEnabled()) { 255 logger.trace("Incomplete STOMP frame content received in session " + 256 session + ", bufferSize=" + decoder.getBufferSize() + 257 ", bufferSizeLimit=" + decoder.getBufferSizeLimit() + "."); 258 } 259 return; 260 } 261 } 262 catch (Throwable ex) { 263 if (logger.isErrorEnabled()) { 264 logger.error("Failed to parse " + webSocketMessage + 265 " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); 266 } 267 handleError(session, ex, null); 268 return; 269 } 270 271 for (Message<byte[]> message : messages) { 272 try { 273 StompHeaderAccessor headerAccessor = 274 MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 275 276 headerAccessor.setSessionId(session.getId()); 277 headerAccessor.setSessionAttributes(session.getAttributes()); 278 headerAccessor.setUser(getUser(session)); 279 headerAccessor.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, headerAccessor.getHeartbeat()); 280 if (!detectImmutableMessageInterceptor(outputChannel)) { 281 headerAccessor.setImmutable(); 282 } 283 284 if (logger.isTraceEnabled()) { 285 logger.trace("From client: " + headerAccessor.getShortLogMessage(message.getPayload())); 286 } 287 288 StompCommand command = headerAccessor.getCommand(); 289 boolean isConnect = StompCommand.CONNECT.equals(command); 290 if (isConnect) { 291 this.stats.incrementConnectCount(); 292 } 293 else if (StompCommand.DISCONNECT.equals(command)) { 294 this.stats.incrementDisconnectCount(); 295 } 296 297 try { 298 SimpAttributesContextHolder.setAttributesFromMessage(message); 299 boolean sent = outputChannel.send(message); 300 301 if (sent) { 302 if (isConnect) { 303 Principal user = headerAccessor.getUser(); 304 if (user != null && user != session.getPrincipal()) { 305 this.stompAuthentications.put(session.getId(), user); 306 } 307 } 308 if (this.eventPublisher != null) { 309 if (isConnect) { 310 publishEvent(new SessionConnectEvent(this, message, getUser(session))); 311 } 312 else if (StompCommand.SUBSCRIBE.equals(command)) { 313 publishEvent(new SessionSubscribeEvent(this, message, getUser(session))); 314 } 315 else if (StompCommand.UNSUBSCRIBE.equals(command)) { 316 publishEvent(new SessionUnsubscribeEvent(this, message, getUser(session))); 317 } 318 } 319 } 320 } 321 finally { 322 SimpAttributesContextHolder.resetAttributes(); 323 } 324 } 325 catch (Throwable ex) { 326 if (logger.isErrorEnabled()) { 327 logger.error("Failed to send client message to application via MessageChannel" + 328 " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); 329 } 330 handleError(session, ex, message); 331 } 332 } 333 } 334 335 private Principal getUser(WebSocketSession session) { 336 Principal user = this.stompAuthentications.get(session.getId()); 337 return user != null ? user : session.getPrincipal(); 338 } 339 340 @SuppressWarnings("deprecation") 341 private void handleError(WebSocketSession session, Throwable ex, Message<byte[]> clientMessage) { 342 if (getErrorHandler() == null) { 343 sendErrorMessage(session, ex); 344 return; 345 } 346 347 Message<byte[]> message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex); 348 if (message == null) { 349 return; 350 } 351 352 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 353 Assert.state(accessor != null, "Expected STOMP headers"); 354 sendToClient(session, accessor, message.getPayload()); 355 } 356 357 /** 358 * Invoked when no 359 * {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler} 360 * is configured to send an ERROR frame to the client. 361 * @deprecated as of Spring 4.2, in favor of 362 * {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring} 363 * a {@code StompSubProtocolErrorHandler} 364 */ 365 @Deprecated 366 protected void sendErrorMessage(WebSocketSession session, Throwable error) { 367 StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); 368 headerAccessor.setMessage(error.getMessage()); 369 370 byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD); 371 try { 372 session.sendMessage(new TextMessage(bytes)); 373 } 374 catch (Throwable ex) { 375 // Could be part of normal workflow (e.g. browser tab closed) 376 logger.debug("Failed to send STOMP ERROR to client", ex); 377 } 378 } 379 380 private boolean detectImmutableMessageInterceptor(MessageChannel channel) { 381 if (this.immutableMessageInterceptorPresent != null) { 382 return this.immutableMessageInterceptorPresent; 383 } 384 385 if (channel instanceof AbstractMessageChannel) { 386 for (ChannelInterceptor interceptor : ((AbstractMessageChannel) channel).getInterceptors()) { 387 if (interceptor instanceof ImmutableMessageChannelInterceptor) { 388 this.immutableMessageInterceptorPresent = true; 389 return true; 390 } 391 } 392 } 393 this.immutableMessageInterceptorPresent = false; 394 return false; 395 } 396 397 private void publishEvent(ApplicationEvent event) { 398 try { 399 this.eventPublisher.publishEvent(event); 400 } 401 catch (Throwable ex) { 402 if (logger.isErrorEnabled()) { 403 logger.error("Error publishing " + event, ex); 404 } 405 } 406 } 407 408 /** 409 * Handle STOMP messages going back out to WebSocket clients. 410 */ 411 @Override 412 @SuppressWarnings("unchecked") 413 public void handleMessageToClient(WebSocketSession session, Message<?> message) { 414 if (!(message.getPayload() instanceof byte[])) { 415 if (logger.isErrorEnabled()) { 416 logger.error("Expected byte[] payload. Ignoring " + message + "."); 417 } 418 return; 419 } 420 421 StompHeaderAccessor accessor = getStompHeaderAccessor(message); 422 StompCommand command = accessor.getCommand(); 423 424 if (StompCommand.MESSAGE.equals(command)) { 425 if (accessor.getSubscriptionId() == null && logger.isWarnEnabled()) { 426 logger.warn("No STOMP \"subscription\" header in " + message); 427 } 428 String origDestination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); 429 if (origDestination != null) { 430 accessor = toMutableAccessor(accessor, message); 431 accessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); 432 accessor.setDestination(origDestination); 433 } 434 } 435 else if (StompCommand.CONNECTED.equals(command)) { 436 this.stats.incrementConnectedCount(); 437 accessor = afterStompSessionConnected(message, accessor, session); 438 if (this.eventPublisher != null) { 439 try { 440 SimpAttributes simpAttributes = new SimpAttributes(session.getId(), session.getAttributes()); 441 SimpAttributesContextHolder.setAttributes(simpAttributes); 442 Principal user = getUser(session); 443 publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, user)); 444 } 445 finally { 446 SimpAttributesContextHolder.resetAttributes(); 447 } 448 } 449 } 450 451 byte[] payload = (byte[]) message.getPayload(); 452 if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) { 453 Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message); 454 accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); 455 Assert.state(accessor != null, "Expected STOMP headers"); 456 payload = errorMessage.getPayload(); 457 } 458 sendToClient(session, accessor, payload); 459 } 460 461 private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) { 462 StompCommand command = stompAccessor.getCommand(); 463 try { 464 byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload); 465 boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) && 466 MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(stompAccessor.getContentType())); 467 if (useBinary) { 468 session.sendMessage(new BinaryMessage(bytes)); 469 } 470 else { 471 session.sendMessage(new TextMessage(bytes)); 472 } 473 } 474 catch (SessionLimitExceededException ex) { 475 // Bad session, just get out 476 throw ex; 477 } 478 catch (Throwable ex) { 479 // Could be part of normal workflow (e.g. browser tab closed) 480 if (logger.isDebugEnabled()) { 481 logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex); 482 } 483 command = StompCommand.ERROR; 484 } 485 finally { 486 if (StompCommand.ERROR.equals(command)) { 487 try { 488 session.close(CloseStatus.PROTOCOL_ERROR); 489 } 490 catch (IOException ex) { 491 // Ignore 492 } 493 } 494 } 495 } 496 497 private StompHeaderAccessor getStompHeaderAccessor(Message<?> message) { 498 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); 499 if (accessor instanceof StompHeaderAccessor) { 500 return (StompHeaderAccessor) accessor; 501 } 502 else { 503 StompHeaderAccessor stompAccessor = StompHeaderAccessor.wrap(message); 504 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); 505 if (SimpMessageType.CONNECT_ACK.equals(messageType)) { 506 stompAccessor = convertConnectAcktoStompConnected(stompAccessor); 507 } 508 else if (SimpMessageType.DISCONNECT_ACK.equals(messageType)) { 509 String receipt = getDisconnectReceipt(stompAccessor); 510 if (receipt != null) { 511 stompAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT); 512 stompAccessor.setReceiptId(receipt); 513 } 514 else { 515 stompAccessor = StompHeaderAccessor.create(StompCommand.ERROR); 516 stompAccessor.setMessage("Session closed."); 517 } 518 } 519 else if (SimpMessageType.HEARTBEAT.equals(messageType)) { 520 stompAccessor = StompHeaderAccessor.createForHeartbeat(); 521 } 522 else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) { 523 stompAccessor.updateStompCommandAsServerMessage(); 524 } 525 return stompAccessor; 526 } 527 } 528 529 /** 530 * The simple broker produces {@code SimpMessageType.CONNECT_ACK} that's not STOMP 531 * specific and needs to be turned into a STOMP CONNECTED frame. 532 */ 533 private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) { 534 String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; 535 Message<?> message = (Message<?>) connectAckHeaders.getHeader(name); 536 if (message == null) { 537 throw new IllegalStateException("Original STOMP CONNECT not found in " + connectAckHeaders); 538 } 539 540 StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 541 StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); 542 543 Set<String> acceptVersions = connectHeaders.getAcceptVersion(); 544 if (acceptVersions.contains("1.2")) { 545 connectedHeaders.setVersion("1.2"); 546 } 547 else if (acceptVersions.contains("1.1")) { 548 connectedHeaders.setVersion("1.1"); 549 } 550 else if (!acceptVersions.isEmpty()) { 551 throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'"); 552 } 553 554 long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER); 555 if (heartbeat != null) { 556 connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]); 557 } 558 else { 559 connectedHeaders.setHeartbeat(0, 0); 560 } 561 562 return connectedHeaders; 563 } 564 565 private String getDisconnectReceipt(SimpMessageHeaderAccessor simpHeaders) { 566 String name = StompHeaderAccessor.DISCONNECT_MESSAGE_HEADER; 567 Message<?> message = (Message<?>) simpHeaders.getHeader(name); 568 if (message != null) { 569 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); 570 return accessor.getReceipt(); 571 } 572 return null; 573 } 574 575 protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message<?> message) { 576 return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message)); 577 } 578 579 @SuppressWarnings("deprecation") 580 private StompHeaderAccessor afterStompSessionConnected(Message<?> message, StompHeaderAccessor accessor, 581 WebSocketSession session) { 582 583 Principal principal = getUser(session); 584 if (principal != null) { 585 accessor = toMutableAccessor(accessor, message); 586 accessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); 587 if (this.userSessionRegistry != null) { 588 String userName = getSessionRegistryUserName(principal); 589 this.userSessionRegistry.registerSessionId(userName, session.getId()); 590 } 591 } 592 593 long[] heartbeat = accessor.getHeartbeat(); 594 if (heartbeat[1] > 0) { 595 session = WebSocketSessionDecorator.unwrap(session); 596 if (session instanceof SockJsSession) { 597 ((SockJsSession) session).disableHeartbeat(); 598 } 599 } 600 601 return accessor; 602 } 603 604 private String getSessionRegistryUserName(Principal principal) { 605 String userName = principal.getName(); 606 if (principal instanceof DestinationUserNameProvider) { 607 userName = ((DestinationUserNameProvider) principal).getDestinationUserName(); 608 } 609 return userName; 610 } 611 612 @Override 613 public String resolveSessionId(Message<?> message) { 614 return SimpMessageHeaderAccessor.getSessionId(message.getHeaders()); 615 } 616 617 @Override 618 public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) { 619 if (session.getTextMessageSizeLimit() < MINIMUM_WEBSOCKET_MESSAGE_SIZE) { 620 session.setTextMessageSizeLimit(MINIMUM_WEBSOCKET_MESSAGE_SIZE); 621 } 622 this.decoders.put(session.getId(), new BufferingStompDecoder(this.stompDecoder, getMessageSizeLimit())); 623 } 624 625 @Override 626 @SuppressWarnings("deprecation") 627 public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { 628 this.decoders.remove(session.getId()); 629 630 Principal principal = getUser(session); 631 if (principal != null && this.userSessionRegistry != null) { 632 String userName = getSessionRegistryUserName(principal); 633 this.userSessionRegistry.unregisterSessionId(userName, session.getId()); 634 } 635 636 Message<byte[]> message = createDisconnectMessage(session); 637 SimpAttributes simpAttributes = SimpAttributes.fromMessage(message); 638 try { 639 SimpAttributesContextHolder.setAttributes(simpAttributes); 640 if (this.eventPublisher != null) { 641 Principal user = getUser(session); 642 publishEvent(new SessionDisconnectEvent(this, message, session.getId(), closeStatus, user)); 643 } 644 outputChannel.send(message); 645 } 646 finally { 647 this.stompAuthentications.remove(session.getId()); 648 SimpAttributesContextHolder.resetAttributes(); 649 simpAttributes.sessionCompleted(); 650 } 651 } 652 653 private Message<byte[]> createDisconnectMessage(WebSocketSession session) { 654 StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.DISCONNECT); 655 if (getHeaderInitializer() != null) { 656 getHeaderInitializer().initHeaders(headerAccessor); 657 } 658 headerAccessor.setSessionId(session.getId()); 659 headerAccessor.setSessionAttributes(session.getAttributes()); 660 headerAccessor.setUser(getUser(session)); 661 return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders()); 662 } 663 664 @Override 665 public String toString() { 666 return "StompSubProtocolHandler" + getSupportedProtocols(); 667 } 668 669 670 private static class Stats { 671 672 private final AtomicInteger connect = new AtomicInteger(); 673 674 private final AtomicInteger connected = new AtomicInteger(); 675 676 private final AtomicInteger disconnect = new AtomicInteger(); 677 678 public void incrementConnectCount() { 679 this.connect.incrementAndGet(); 680 } 681 682 public void incrementConnectedCount() { 683 this.connected.incrementAndGet(); 684 } 685 686 public void incrementDisconnectCount() { 687 this.disconnect.incrementAndGet(); 688 } 689 690 public String toString() { 691 return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + 692 this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; 693 } 694 } 695 696}