001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.messaging;
018
019import java.io.IOException;
020import java.nio.ByteBuffer;
021import java.security.Principal;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.context.ApplicationEvent;
033import org.springframework.context.ApplicationEventPublisher;
034import org.springframework.context.ApplicationEventPublisherAware;
035import org.springframework.messaging.Message;
036import org.springframework.messaging.MessageChannel;
037import org.springframework.messaging.simp.SimpAttributes;
038import org.springframework.messaging.simp.SimpAttributesContextHolder;
039import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
040import org.springframework.messaging.simp.SimpMessageType;
041import org.springframework.messaging.simp.stomp.BufferingStompDecoder;
042import org.springframework.messaging.simp.stomp.StompCommand;
043import org.springframework.messaging.simp.stomp.StompDecoder;
044import org.springframework.messaging.simp.stomp.StompEncoder;
045import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
046import org.springframework.messaging.simp.user.DestinationUserNameProvider;
047import org.springframework.messaging.support.AbstractMessageChannel;
048import org.springframework.messaging.support.ChannelInterceptor;
049import org.springframework.messaging.support.ImmutableMessageChannelInterceptor;
050import org.springframework.messaging.support.MessageBuilder;
051import org.springframework.messaging.support.MessageHeaderAccessor;
052import org.springframework.messaging.support.MessageHeaderInitializer;
053import org.springframework.util.Assert;
054import org.springframework.util.MimeTypeUtils;
055import org.springframework.web.socket.BinaryMessage;
056import org.springframework.web.socket.CloseStatus;
057import org.springframework.web.socket.TextMessage;
058import org.springframework.web.socket.WebSocketMessage;
059import org.springframework.web.socket.WebSocketSession;
060import org.springframework.web.socket.handler.SessionLimitExceededException;
061import org.springframework.web.socket.handler.WebSocketSessionDecorator;
062import org.springframework.web.socket.sockjs.transport.SockJsSession;
063
064/**
065 * A {@link SubProtocolHandler} for STOMP that supports versions 1.0, 1.1, and 1.2
066 * of the STOMP specification.
067 *
068 * @author Rossen Stoyanchev
069 * @author Andy Wilkinson
070 * @since 4.0
071 */
072public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationEventPublisherAware {
073
074        /**
075         * This handler supports assembling large STOMP messages split into multiple
076         * WebSocket messages and STOMP clients (like stomp.js) indeed split large STOMP
077         * messages at 16K boundaries. Therefore the WebSocket server input message
078         * buffer size must allow 16K at least plus a little extra for SockJS framing.
079         */
080        public static final int MINIMUM_WEBSOCKET_MESSAGE_SIZE = 16 * 1024 + 256;
081
082        /**
083         * The name of the header set on the CONNECTED frame indicating the name
084         * of the user authenticated on the WebSocket session.
085         */
086        public static final String CONNECTED_USER_HEADER = "user-name";
087
088        private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class);
089
090        private static final byte[] EMPTY_PAYLOAD = new byte[0];
091
092
093        private StompSubProtocolErrorHandler errorHandler;
094
095        private int messageSizeLimit = 64 * 1024;
096
097        @SuppressWarnings("deprecation")
098        private org.springframework.messaging.simp.user.UserSessionRegistry userSessionRegistry;
099
100        private StompEncoder stompEncoder = new StompEncoder();
101
102        private StompDecoder stompDecoder = new StompDecoder();
103
104        private final Map<String, BufferingStompDecoder> decoders = new ConcurrentHashMap<String, BufferingStompDecoder>();
105
106        private MessageHeaderInitializer headerInitializer;
107
108        private final Map<String, Principal> stompAuthentications = new ConcurrentHashMap<String, Principal>();
109
110        private Boolean immutableMessageInterceptorPresent;
111
112        private ApplicationEventPublisher eventPublisher;
113
114        private final Stats stats = new Stats();
115
116
117        /**
118         * Configure a handler for error messages sent to clients which allows
119         * customizing the error messages or preventing them from being sent.
120         * <p>By default this isn't configured in which case an ERROR frame is sent
121         * with a message header reflecting the error.
122         * @param errorHandler the error handler
123         */
124        public void setErrorHandler(StompSubProtocolErrorHandler errorHandler) {
125                this.errorHandler = errorHandler;
126        }
127
128        /**
129         * Return the configured error handler.
130         */
131        public StompSubProtocolErrorHandler getErrorHandler() {
132                return this.errorHandler;
133        }
134
135        /**
136         * Configure the maximum size allowed for an incoming STOMP message.
137         * Since a STOMP message can be received in multiple WebSocket messages,
138         * buffering may be required and therefore it is necessary to know the maximum
139         * allowed message size.
140         * <p>By default this property is set to 64K.
141         * @since 4.0.3
142         */
143        public void setMessageSizeLimit(int messageSizeLimit) {
144                this.messageSizeLimit = messageSizeLimit;
145        }
146
147        /**
148         * Get the configured message buffer size limit in bytes.
149         * @since 4.0.3
150         */
151        public int getMessageSizeLimit() {
152                return this.messageSizeLimit;
153        }
154
155        /**
156         * Provide a registry with which to register active user session ids.
157         * @see org.springframework.messaging.simp.user.UserDestinationMessageHandler
158         * @deprecated as of 4.2 in favor of {@link DefaultSimpUserRegistry} which relies
159         * on the ApplicationContext events published by this class and is created via
160         * {@link org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport#createLocalUserRegistry
161         * WebSocketMessageBrokerConfigurationSupport.createLocalUserRegistry}
162         */
163        @Deprecated
164        public void setUserSessionRegistry(org.springframework.messaging.simp.user.UserSessionRegistry registry) {
165                this.userSessionRegistry = registry;
166        }
167
168        /**
169         * @deprecated as of 4.2
170         */
171        @Deprecated
172        public org.springframework.messaging.simp.user.UserSessionRegistry getUserSessionRegistry() {
173                return this.userSessionRegistry;
174        }
175
176        /**
177         * Configure a {@link StompEncoder} for encoding STOMP frames
178         * @since 4.3.5
179         */
180        public void setEncoder(StompEncoder encoder) {
181                this.stompEncoder = encoder;
182        }
183
184        /**
185         * Configure a {@link StompDecoder} for decoding STOMP frames
186         * @since 4.3.5
187         */
188        public void setDecoder(StompDecoder decoder) {
189                this.stompDecoder = decoder;
190        }
191
192        /**
193         * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
194         * messages created from decoded STOMP frames and other messages sent to the
195         * client inbound channel.
196         * <p>By default this property is not set.
197         */
198        public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
199                this.headerInitializer = headerInitializer;
200                this.stompDecoder.setHeaderInitializer(headerInitializer);
201        }
202
203        /**
204         * Return the configured header initializer.
205         */
206        public MessageHeaderInitializer getHeaderInitializer() {
207                return this.headerInitializer;
208        }
209
210        @Override
211        public List<String> getSupportedProtocols() {
212                return Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
213        }
214
215        @Override
216        public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
217                this.eventPublisher = applicationEventPublisher;
218        }
219
220        /**
221         * Return a String describing internal state and counters.
222         */
223        public String getStatsInfo() {
224                return this.stats.toString();
225        }
226
227
228        /**
229         * Handle incoming WebSocket messages from clients.
230         */
231        public void handleMessageFromClient(WebSocketSession session,
232                        WebSocketMessage<?> webSocketMessage, MessageChannel outputChannel) {
233
234                List<Message<byte[]>> messages;
235                try {
236                        ByteBuffer byteBuffer;
237                        if (webSocketMessage instanceof TextMessage) {
238                                byteBuffer = ByteBuffer.wrap(((TextMessage) webSocketMessage).asBytes());
239                        }
240                        else if (webSocketMessage instanceof BinaryMessage) {
241                                byteBuffer = ((BinaryMessage) webSocketMessage).getPayload();
242                        }
243                        else {
244                                return;
245                        }
246
247                        BufferingStompDecoder decoder = this.decoders.get(session.getId());
248                        if (decoder == null) {
249                                throw new IllegalStateException("No decoder for session id '" + session.getId() + "'");
250                        }
251
252                        messages = decoder.decode(byteBuffer);
253                        if (messages.isEmpty()) {
254                                if (logger.isTraceEnabled()) {
255                                        logger.trace("Incomplete STOMP frame content received in session " +
256                                                        session + ", bufferSize=" + decoder.getBufferSize() +
257                                                        ", bufferSizeLimit=" + decoder.getBufferSizeLimit() + ".");
258                                }
259                                return;
260                        }
261                }
262                catch (Throwable ex) {
263                        if (logger.isErrorEnabled()) {
264                                logger.error("Failed to parse " + webSocketMessage +
265                                                " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex);
266                        }
267                        handleError(session, ex, null);
268                        return;
269                }
270
271                for (Message<byte[]> message : messages) {
272                        try {
273                                StompHeaderAccessor headerAccessor =
274                                                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
275
276                                headerAccessor.setSessionId(session.getId());
277                                headerAccessor.setSessionAttributes(session.getAttributes());
278                                headerAccessor.setUser(getUser(session));
279                                headerAccessor.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, headerAccessor.getHeartbeat());
280                                if (!detectImmutableMessageInterceptor(outputChannel)) {
281                                        headerAccessor.setImmutable();
282                                }
283
284                                if (logger.isTraceEnabled()) {
285                                        logger.trace("From client: " + headerAccessor.getShortLogMessage(message.getPayload()));
286                                }
287
288                                StompCommand command = headerAccessor.getCommand();
289                                boolean isConnect = StompCommand.CONNECT.equals(command);
290                                if (isConnect) {
291                                        this.stats.incrementConnectCount();
292                                }
293                                else if (StompCommand.DISCONNECT.equals(command)) {
294                                        this.stats.incrementDisconnectCount();
295                                }
296
297                                try {
298                                        SimpAttributesContextHolder.setAttributesFromMessage(message);
299                                        boolean sent = outputChannel.send(message);
300
301                                        if (sent) {
302                                                if (isConnect) {
303                                                        Principal user = headerAccessor.getUser();
304                                                        if (user != null && user != session.getPrincipal()) {
305                                                                this.stompAuthentications.put(session.getId(), user);
306                                                        }
307                                                }
308                                                if (this.eventPublisher != null) {
309                                                        if (isConnect) {
310                                                                publishEvent(new SessionConnectEvent(this, message, getUser(session)));
311                                                        }
312                                                        else if (StompCommand.SUBSCRIBE.equals(command)) {
313                                                                publishEvent(new SessionSubscribeEvent(this, message, getUser(session)));
314                                                        }
315                                                        else if (StompCommand.UNSUBSCRIBE.equals(command)) {
316                                                                publishEvent(new SessionUnsubscribeEvent(this, message, getUser(session)));
317                                                        }
318                                                }
319                                        }
320                                }
321                                finally {
322                                        SimpAttributesContextHolder.resetAttributes();
323                                }
324                        }
325                        catch (Throwable ex) {
326                                if (logger.isErrorEnabled()) {
327                                        logger.error("Failed to send client message to application via MessageChannel" +
328                                                        " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex);
329                                }
330                                handleError(session, ex, message);
331                        }
332                }
333        }
334
335        private Principal getUser(WebSocketSession session) {
336                Principal user = this.stompAuthentications.get(session.getId());
337                return user != null ? user : session.getPrincipal();
338        }
339
340        @SuppressWarnings("deprecation")
341        private void handleError(WebSocketSession session, Throwable ex, Message<byte[]> clientMessage) {
342                if (getErrorHandler() == null) {
343                        sendErrorMessage(session, ex);
344                        return;
345                }
346
347                Message<byte[]> message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex);
348                if (message == null) {
349                        return;
350                }
351
352                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
353                Assert.state(accessor != null, "Expected STOMP headers");
354                sendToClient(session, accessor, message.getPayload());
355        }
356
357        /**
358         * Invoked when no
359         * {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler}
360         * is configured to send an ERROR frame to the client.
361         * @deprecated as of Spring 4.2, in favor of
362         * {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring}
363         * a {@code StompSubProtocolErrorHandler}
364         */
365        @Deprecated
366        protected void sendErrorMessage(WebSocketSession session, Throwable error) {
367                StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
368                headerAccessor.setMessage(error.getMessage());
369
370                byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD);
371                try {
372                        session.sendMessage(new TextMessage(bytes));
373                }
374                catch (Throwable ex) {
375                        // Could be part of normal workflow (e.g. browser tab closed)
376                        logger.debug("Failed to send STOMP ERROR to client", ex);
377                }
378        }
379
380        private boolean detectImmutableMessageInterceptor(MessageChannel channel) {
381                if (this.immutableMessageInterceptorPresent != null) {
382                        return this.immutableMessageInterceptorPresent;
383                }
384
385                if (channel instanceof AbstractMessageChannel) {
386                        for (ChannelInterceptor interceptor : ((AbstractMessageChannel) channel).getInterceptors()) {
387                                if (interceptor instanceof ImmutableMessageChannelInterceptor) {
388                                        this.immutableMessageInterceptorPresent = true;
389                                        return true;
390                                }
391                        }
392                }
393                this.immutableMessageInterceptorPresent = false;
394                return false;
395        }
396
397        private void publishEvent(ApplicationEvent event) {
398                try {
399                        this.eventPublisher.publishEvent(event);
400                }
401                catch (Throwable ex) {
402                        if (logger.isErrorEnabled()) {
403                                logger.error("Error publishing " + event, ex);
404                        }
405                }
406        }
407
408        /**
409         * Handle STOMP messages going back out to WebSocket clients.
410         */
411        @Override
412        @SuppressWarnings("unchecked")
413        public void handleMessageToClient(WebSocketSession session, Message<?> message) {
414                if (!(message.getPayload() instanceof byte[])) {
415                        if (logger.isErrorEnabled()) {
416                                logger.error("Expected byte[] payload. Ignoring " + message + ".");
417                        }
418                        return;
419                }
420
421                StompHeaderAccessor accessor = getStompHeaderAccessor(message);
422                StompCommand command = accessor.getCommand();
423
424                if (StompCommand.MESSAGE.equals(command)) {
425                        if (accessor.getSubscriptionId() == null && logger.isWarnEnabled()) {
426                                logger.warn("No STOMP \"subscription\" header in " + message);
427                        }
428                        String origDestination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
429                        if (origDestination != null) {
430                                accessor = toMutableAccessor(accessor, message);
431                                accessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
432                                accessor.setDestination(origDestination);
433                        }
434                }
435                else if (StompCommand.CONNECTED.equals(command)) {
436                        this.stats.incrementConnectedCount();
437                        accessor = afterStompSessionConnected(message, accessor, session);
438                        if (this.eventPublisher != null) {
439                                try {
440                                        SimpAttributes simpAttributes = new SimpAttributes(session.getId(), session.getAttributes());
441                                        SimpAttributesContextHolder.setAttributes(simpAttributes);
442                                        Principal user = getUser(session);
443                                        publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, user));
444                                }
445                                finally {
446                                        SimpAttributesContextHolder.resetAttributes();
447                                }
448                        }
449                }
450
451                byte[] payload = (byte[]) message.getPayload();
452                if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) {
453                        Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message);
454                        accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
455                        Assert.state(accessor != null, "Expected STOMP headers");
456                        payload = errorMessage.getPayload();
457                }
458                sendToClient(session, accessor, payload);
459        }
460
461        private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) {
462                StompCommand command = stompAccessor.getCommand();
463                try {
464                        byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload);
465                        boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) &&
466                                        MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(stompAccessor.getContentType()));
467                        if (useBinary) {
468                                session.sendMessage(new BinaryMessage(bytes));
469                        }
470                        else {
471                                session.sendMessage(new TextMessage(bytes));
472                        }
473                }
474                catch (SessionLimitExceededException ex) {
475                        // Bad session, just get out
476                        throw ex;
477                }
478                catch (Throwable ex) {
479                        // Could be part of normal workflow (e.g. browser tab closed)
480                        if (logger.isDebugEnabled()) {
481                                logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex);
482                        }
483                        command = StompCommand.ERROR;
484                }
485                finally {
486                        if (StompCommand.ERROR.equals(command)) {
487                                try {
488                                        session.close(CloseStatus.PROTOCOL_ERROR);
489                                }
490                                catch (IOException ex) {
491                                        // Ignore
492                                }
493                        }
494                }
495        }
496
497        private StompHeaderAccessor getStompHeaderAccessor(Message<?> message) {
498                MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
499                if (accessor instanceof StompHeaderAccessor) {
500                        return (StompHeaderAccessor) accessor;
501                }
502                else {
503                        StompHeaderAccessor stompAccessor = StompHeaderAccessor.wrap(message);
504                        SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
505                        if (SimpMessageType.CONNECT_ACK.equals(messageType)) {
506                                stompAccessor = convertConnectAcktoStompConnected(stompAccessor);
507                        }
508                        else if (SimpMessageType.DISCONNECT_ACK.equals(messageType)) {
509                                String receipt = getDisconnectReceipt(stompAccessor);
510                                if (receipt != null) {
511                                        stompAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
512                                        stompAccessor.setReceiptId(receipt);
513                                }
514                                else {
515                                        stompAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
516                                        stompAccessor.setMessage("Session closed.");
517                                }
518                        }
519                        else if (SimpMessageType.HEARTBEAT.equals(messageType)) {
520                                stompAccessor = StompHeaderAccessor.createForHeartbeat();
521                        }
522                        else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) {
523                                stompAccessor.updateStompCommandAsServerMessage();
524                        }
525                        return stompAccessor;
526                }
527        }
528
529        /**
530         * The simple broker produces {@code SimpMessageType.CONNECT_ACK} that's not STOMP
531         * specific and needs to be turned into a STOMP CONNECTED frame.
532         */
533        private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) {
534                String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER;
535                Message<?> message = (Message<?>) connectAckHeaders.getHeader(name);
536                if (message == null) {
537                        throw new IllegalStateException("Original STOMP CONNECT not found in " + connectAckHeaders);
538                }
539
540                StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
541                StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
542
543                Set<String> acceptVersions = connectHeaders.getAcceptVersion();
544                if (acceptVersions.contains("1.2")) {
545                        connectedHeaders.setVersion("1.2");
546                }
547                else if (acceptVersions.contains("1.1")) {
548                        connectedHeaders.setVersion("1.1");
549                }
550                else if (!acceptVersions.isEmpty()) {
551                        throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'");
552                }
553
554                long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER);
555                if (heartbeat != null) {
556                        connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]);
557                }
558                else {
559                        connectedHeaders.setHeartbeat(0, 0);
560                }
561
562                return connectedHeaders;
563        }
564
565        private String getDisconnectReceipt(SimpMessageHeaderAccessor simpHeaders) {
566                String name = StompHeaderAccessor.DISCONNECT_MESSAGE_HEADER;
567                Message<?> message = (Message<?>) simpHeaders.getHeader(name);
568                if (message != null) {
569                        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
570                        return accessor.getReceipt();
571                }
572                return null;
573        }
574
575        protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message<?> message) {
576                return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message));
577        }
578
579        @SuppressWarnings("deprecation")
580        private StompHeaderAccessor afterStompSessionConnected(Message<?> message, StompHeaderAccessor accessor,
581                        WebSocketSession session) {
582
583                Principal principal = getUser(session);
584                if (principal != null) {
585                        accessor = toMutableAccessor(accessor, message);
586                        accessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName());
587                        if (this.userSessionRegistry != null) {
588                                String userName = getSessionRegistryUserName(principal);
589                                this.userSessionRegistry.registerSessionId(userName, session.getId());
590                        }
591                }
592
593                long[] heartbeat = accessor.getHeartbeat();
594                if (heartbeat[1] > 0) {
595                        session = WebSocketSessionDecorator.unwrap(session);
596                        if (session instanceof SockJsSession) {
597                                ((SockJsSession) session).disableHeartbeat();
598                        }
599                }
600
601                return accessor;
602        }
603
604        private String getSessionRegistryUserName(Principal principal) {
605                String userName = principal.getName();
606                if (principal instanceof DestinationUserNameProvider) {
607                        userName = ((DestinationUserNameProvider) principal).getDestinationUserName();
608                }
609                return userName;
610        }
611
612        @Override
613        public String resolveSessionId(Message<?> message) {
614                return SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
615        }
616
617        @Override
618        public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) {
619                if (session.getTextMessageSizeLimit() < MINIMUM_WEBSOCKET_MESSAGE_SIZE) {
620                        session.setTextMessageSizeLimit(MINIMUM_WEBSOCKET_MESSAGE_SIZE);
621                }
622                this.decoders.put(session.getId(), new BufferingStompDecoder(this.stompDecoder, getMessageSizeLimit()));
623        }
624
625        @Override
626        @SuppressWarnings("deprecation")
627        public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) {
628                this.decoders.remove(session.getId());
629
630                Principal principal = getUser(session);
631                if (principal != null && this.userSessionRegistry != null) {
632                        String userName = getSessionRegistryUserName(principal);
633                        this.userSessionRegistry.unregisterSessionId(userName, session.getId());
634                }
635
636                Message<byte[]> message = createDisconnectMessage(session);
637                SimpAttributes simpAttributes = SimpAttributes.fromMessage(message);
638                try {
639                        SimpAttributesContextHolder.setAttributes(simpAttributes);
640                        if (this.eventPublisher != null) {
641                                Principal user = getUser(session);
642                                publishEvent(new SessionDisconnectEvent(this, message, session.getId(), closeStatus, user));
643                        }
644                        outputChannel.send(message);
645                }
646                finally {
647                        this.stompAuthentications.remove(session.getId());
648                        SimpAttributesContextHolder.resetAttributes();
649                        simpAttributes.sessionCompleted();
650                }
651        }
652
653        private Message<byte[]> createDisconnectMessage(WebSocketSession session) {
654                StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
655                if (getHeaderInitializer() != null) {
656                        getHeaderInitializer().initHeaders(headerAccessor);
657                }
658                headerAccessor.setSessionId(session.getId());
659                headerAccessor.setSessionAttributes(session.getAttributes());
660                headerAccessor.setUser(getUser(session));
661                return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
662        }
663
664        @Override
665        public String toString() {
666                return "StompSubProtocolHandler" + getSupportedProtocols();
667        }
668
669
670        private static class Stats {
671
672                private final AtomicInteger connect = new AtomicInteger();
673
674                private final AtomicInteger connected = new AtomicInteger();
675
676                private final AtomicInteger disconnect = new AtomicInteger();
677
678                public void incrementConnectCount() {
679                        this.connect.incrementAndGet();
680                }
681
682                public void incrementConnectedCount() {
683                        this.connected.incrementAndGet();
684                }
685
686                public void incrementDisconnectCount() {
687                        this.disconnect.incrementAndGet();
688                }
689
690                public String toString() {
691                        return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
692                                        this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
693                }
694        }
695
696}