001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.security.Principal;
023import java.util.List;
024import java.util.Map;
025import java.util.Queue;
026import java.util.concurrent.LinkedBlockingDeque;
027
028import org.springframework.http.HttpHeaders;
029import org.springframework.util.Assert;
030import org.springframework.util.StringUtils;
031import org.springframework.web.socket.CloseStatus;
032import org.springframework.web.socket.TextMessage;
033import org.springframework.web.socket.WebSocketExtension;
034import org.springframework.web.socket.WebSocketHandler;
035import org.springframework.web.socket.WebSocketSession;
036import org.springframework.web.socket.adapter.NativeWebSocketSession;
037import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
038import org.springframework.web.socket.sockjs.frame.SockJsFrame;
039import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
040
041/**
042 * A SockJS session for use with the WebSocket transport.
043 *
044 * @author Rossen Stoyanchev
045 * @author Juergen Hoeller
046 * @since 4.0
047 */
048public class WebSocketServerSockJsSession extends AbstractSockJsSession implements NativeWebSocketSession {
049
050        private WebSocketSession webSocketSession;
051
052        private volatile boolean openFrameSent;
053
054        private final Queue<String> initSessionCache = new LinkedBlockingDeque<String>();
055
056        private final Object initSessionLock = new Object();
057
058        private final Object disconnectLock = new Object();
059
060        private volatile boolean disconnected;
061
062
063        public WebSocketServerSockJsSession(String id, SockJsServiceConfig config,
064                        WebSocketHandler handler, Map<String, Object> attributes) {
065
066                super(id, config, handler, attributes);
067        }
068
069
070        @Override
071        public URI getUri() {
072                checkDelegateSessionInitialized();
073                return this.webSocketSession.getUri();
074        }
075
076        @Override
077        public HttpHeaders getHandshakeHeaders() {
078                checkDelegateSessionInitialized();
079                return this.webSocketSession.getHandshakeHeaders();
080        }
081
082        @Override
083        public Principal getPrincipal() {
084                checkDelegateSessionInitialized();
085                return this.webSocketSession.getPrincipal();
086        }
087
088        @Override
089        public InetSocketAddress getLocalAddress() {
090                checkDelegateSessionInitialized();
091                return this.webSocketSession.getLocalAddress();
092        }
093
094        @Override
095        public InetSocketAddress getRemoteAddress() {
096                checkDelegateSessionInitialized();
097                return this.webSocketSession.getRemoteAddress();
098        }
099
100        @Override
101        public String getAcceptedProtocol() {
102                checkDelegateSessionInitialized();
103                return this.webSocketSession.getAcceptedProtocol();
104        }
105
106        @Override
107        public void setTextMessageSizeLimit(int messageSizeLimit) {
108                checkDelegateSessionInitialized();
109                this.webSocketSession.setTextMessageSizeLimit(messageSizeLimit);
110        }
111
112        @Override
113        public int getTextMessageSizeLimit() {
114                checkDelegateSessionInitialized();
115                return this.webSocketSession.getTextMessageSizeLimit();
116        }
117
118        @Override
119        public void setBinaryMessageSizeLimit(int messageSizeLimit) {
120                checkDelegateSessionInitialized();
121                this.webSocketSession.setBinaryMessageSizeLimit(messageSizeLimit);
122        }
123
124        @Override
125        public int getBinaryMessageSizeLimit() {
126                checkDelegateSessionInitialized();
127                return this.webSocketSession.getBinaryMessageSizeLimit();
128        }
129
130        @Override
131        public List<WebSocketExtension> getExtensions() {
132                checkDelegateSessionInitialized();
133                return this.webSocketSession.getExtensions();
134        }
135
136        private void checkDelegateSessionInitialized() {
137                Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized");
138        }
139
140        @Override
141        public Object getNativeSession() {
142                return (this.webSocketSession instanceof NativeWebSocketSession ?
143                                ((NativeWebSocketSession) this.webSocketSession).getNativeSession() : this.webSocketSession);
144        }
145
146        @Override
147        public <T> T getNativeSession(Class<T> requiredType) {
148                return (this.webSocketSession instanceof NativeWebSocketSession ?
149                                ((NativeWebSocketSession) this.webSocketSession).getNativeSession(requiredType) : null);
150        }
151
152
153        public void initializeDelegateSession(WebSocketSession session) {
154                synchronized (this.initSessionLock) {
155                        this.webSocketSession = session;
156                        try {
157                                // Let "our" handler know before sending the open frame to the remote handler
158                                delegateConnectionEstablished();
159                                this.webSocketSession.sendMessage(new TextMessage(SockJsFrame.openFrame().getContent()));
160
161                                // Flush any messages cached in the mean time
162                                while (!this.initSessionCache.isEmpty()) {
163                                        writeFrame(SockJsFrame.messageFrame(getMessageCodec(), this.initSessionCache.poll()));
164                                }
165                                scheduleHeartbeat();
166                                this.openFrameSent = true;
167                        }
168                        catch (Throwable ex) {
169                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
170                        }
171                }
172        }
173
174        @Override
175        public boolean isActive() {
176                return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected);
177        }
178
179        public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
180                String payload = message.getPayload();
181                if (StringUtils.isEmpty(payload)) {
182                        return;
183                }
184                String[] messages;
185                try {
186                        messages = getSockJsServiceConfig().getMessageCodec().decode(payload);
187                }
188                catch (Throwable ex) {
189                        logger.error("Broken data received. Terminating WebSocket connection abruptly", ex);
190                        tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA);
191                        return;
192                }
193                if (messages != null) {
194                        delegateMessages(messages);
195                }
196        }
197
198        @Override
199        public void sendMessageInternal(String message) throws SockJsTransportFailureException {
200                // Open frame not sent yet?
201                // If in the session initialization thread, then cache, otherwise wait.
202                if (!this.openFrameSent) {
203                        synchronized (this.initSessionLock) {
204                                if (!this.openFrameSent) {
205                                        this.initSessionCache.add(message);
206                                        return;
207                                }
208                        }
209                }
210
211                cancelHeartbeat();
212                writeFrame(SockJsFrame.messageFrame(getMessageCodec(), message));
213                scheduleHeartbeat();
214        }
215
216        @Override
217        protected void writeFrameInternal(SockJsFrame frame) throws IOException {
218                if (logger.isTraceEnabled()) {
219                        logger.trace("Writing " + frame);
220                }
221                TextMessage message = new TextMessage(frame.getContent());
222                this.webSocketSession.sendMessage(message);
223        }
224
225        @Override
226        protected void disconnect(CloseStatus status) throws IOException {
227                if (isActive()) {
228                        synchronized (this.disconnectLock) {
229                                if (isActive()) {
230                                        this.disconnected = true;
231                                        if (this.webSocketSession != null) {
232                                                this.webSocketSession.close(status);
233                                        }
234                                }
235                        }
236                }
237        }
238
239}