001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.security.Principal;
023import java.util.List;
024import java.util.Map;
025import java.util.Queue;
026import java.util.concurrent.LinkedBlockingDeque;
027
028import org.springframework.http.HttpHeaders;
029import org.springframework.lang.Nullable;
030import org.springframework.util.Assert;
031import org.springframework.util.StringUtils;
032import org.springframework.web.socket.CloseStatus;
033import org.springframework.web.socket.TextMessage;
034import org.springframework.web.socket.WebSocketExtension;
035import org.springframework.web.socket.WebSocketHandler;
036import org.springframework.web.socket.WebSocketSession;
037import org.springframework.web.socket.adapter.NativeWebSocketSession;
038import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
039import org.springframework.web.socket.sockjs.frame.SockJsFrame;
040import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
041
042/**
043 * A SockJS session for use with the WebSocket transport.
044 *
045 * @author Rossen Stoyanchev
046 * @author Juergen Hoeller
047 * @since 4.0
048 */
049public class WebSocketServerSockJsSession extends AbstractSockJsSession implements NativeWebSocketSession {
050
051        @Nullable
052        private WebSocketSession webSocketSession;
053
054        private volatile boolean openFrameSent;
055
056        private final Queue<String> initSessionCache = new LinkedBlockingDeque<>();
057
058        private final Object initSessionLock = new Object();
059
060        private final Object disconnectLock = new Object();
061
062        private volatile boolean disconnected;
063
064
065        public WebSocketServerSockJsSession(String id, SockJsServiceConfig config,
066                        WebSocketHandler handler, @Nullable Map<String, Object> attributes) {
067
068                super(id, config, handler, attributes);
069        }
070
071
072        @Override
073        @Nullable
074        public URI getUri() {
075                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
076                return this.webSocketSession.getUri();
077        }
078
079        @Override
080        public HttpHeaders getHandshakeHeaders() {
081                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
082                return this.webSocketSession.getHandshakeHeaders();
083        }
084
085        @Override
086        public Principal getPrincipal() {
087                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
088                return this.webSocketSession.getPrincipal();
089        }
090
091        @Override
092        public InetSocketAddress getLocalAddress() {
093                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
094                return this.webSocketSession.getLocalAddress();
095        }
096
097        @Override
098        public InetSocketAddress getRemoteAddress() {
099                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
100                return this.webSocketSession.getRemoteAddress();
101        }
102
103        @Override
104        public String getAcceptedProtocol() {
105                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
106                return this.webSocketSession.getAcceptedProtocol();
107        }
108
109        @Override
110        public void setTextMessageSizeLimit(int messageSizeLimit) {
111                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
112                this.webSocketSession.setTextMessageSizeLimit(messageSizeLimit);
113        }
114
115        @Override
116        public int getTextMessageSizeLimit() {
117                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
118                return this.webSocketSession.getTextMessageSizeLimit();
119        }
120
121        @Override
122        public void setBinaryMessageSizeLimit(int messageSizeLimit) {
123                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
124                this.webSocketSession.setBinaryMessageSizeLimit(messageSizeLimit);
125        }
126
127        @Override
128        public int getBinaryMessageSizeLimit() {
129                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
130                return this.webSocketSession.getBinaryMessageSizeLimit();
131        }
132
133        @Override
134        public List<WebSocketExtension> getExtensions() {
135                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
136                return this.webSocketSession.getExtensions();
137        }
138
139        @Override
140        public Object getNativeSession() {
141                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
142                return (this.webSocketSession instanceof NativeWebSocketSession ?
143                                ((NativeWebSocketSession) this.webSocketSession).getNativeSession() : this.webSocketSession);
144        }
145
146        @Override
147        @Nullable
148        public <T> T getNativeSession(@Nullable Class<T> requiredType) {
149                return (this.webSocketSession instanceof NativeWebSocketSession ?
150                                ((NativeWebSocketSession) this.webSocketSession).getNativeSession(requiredType) : null);
151        }
152
153
154        public void initializeDelegateSession(WebSocketSession session) {
155                synchronized (this.initSessionLock) {
156                        this.webSocketSession = session;
157                        try {
158                                // Let "our" handler know before sending the open frame to the remote handler
159                                delegateConnectionEstablished();
160                                this.webSocketSession.sendMessage(new TextMessage(SockJsFrame.openFrame().getContent()));
161
162                                // Flush any messages cached in the mean time
163                                while (!this.initSessionCache.isEmpty()) {
164                                        writeFrame(SockJsFrame.messageFrame(getMessageCodec(), this.initSessionCache.poll()));
165                                }
166                                scheduleHeartbeat();
167                                this.openFrameSent = true;
168                        }
169                        catch (Exception ex) {
170                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
171                        }
172                }
173        }
174
175        @Override
176        public boolean isActive() {
177                return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected);
178        }
179
180        public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
181                String payload = message.getPayload();
182                if (!StringUtils.hasLength(payload)) {
183                        return;
184                }
185                String[] messages;
186                try {
187                        messages = getSockJsServiceConfig().getMessageCodec().decode(payload);
188                }
189                catch (Exception ex) {
190                        logger.error("Broken data received. Terminating WebSocket connection abruptly", ex);
191                        tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA);
192                        return;
193                }
194                if (messages != null) {
195                        delegateMessages(messages);
196                }
197        }
198
199        @Override
200        public void sendMessageInternal(String message) throws SockJsTransportFailureException {
201                // Open frame not sent yet?
202                // If in the session initialization thread, then cache, otherwise wait.
203                if (!this.openFrameSent) {
204                        synchronized (this.initSessionLock) {
205                                if (!this.openFrameSent) {
206                                        this.initSessionCache.add(message);
207                                        return;
208                                }
209                        }
210                }
211
212                cancelHeartbeat();
213                writeFrame(SockJsFrame.messageFrame(getMessageCodec(), message));
214                scheduleHeartbeat();
215        }
216
217        @Override
218        protected void writeFrameInternal(SockJsFrame frame) throws IOException {
219                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
220                if (logger.isTraceEnabled()) {
221                        logger.trace("Writing " + frame);
222                }
223                TextMessage message = new TextMessage(frame.getContent());
224                this.webSocketSession.sendMessage(message);
225        }
226
227        @Override
228        protected void disconnect(CloseStatus status) throws IOException {
229                if (isActive()) {
230                        synchronized (this.disconnectLock) {
231                                if (isActive()) {
232                                        this.disconnected = true;
233                                        if (this.webSocketSession != null) {
234                                                this.webSocketSession.close(status);
235                                        }
236                                }
237                        }
238                }
239        }
240
241}