001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.URI; 022import java.security.Principal; 023import java.util.List; 024import java.util.Map; 025import java.util.Queue; 026import java.util.concurrent.LinkedBlockingDeque; 027 028import org.springframework.http.HttpHeaders; 029import org.springframework.lang.Nullable; 030import org.springframework.util.Assert; 031import org.springframework.util.StringUtils; 032import org.springframework.web.socket.CloseStatus; 033import org.springframework.web.socket.TextMessage; 034import org.springframework.web.socket.WebSocketExtension; 035import org.springframework.web.socket.WebSocketHandler; 036import org.springframework.web.socket.WebSocketSession; 037import org.springframework.web.socket.adapter.NativeWebSocketSession; 038import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 039import org.springframework.web.socket.sockjs.frame.SockJsFrame; 040import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 041 042/** 043 * A SockJS session for use with the WebSocket transport. 044 * 045 * @author Rossen Stoyanchev 046 * @author Juergen Hoeller 047 * @since 4.0 048 */ 049public class WebSocketServerSockJsSession extends AbstractSockJsSession implements NativeWebSocketSession { 050 051 @Nullable 052 private WebSocketSession webSocketSession; 053 054 private volatile boolean openFrameSent; 055 056 private final Queue<String> initSessionCache = new LinkedBlockingDeque<>(); 057 058 private final Object initSessionLock = new Object(); 059 060 private final Object disconnectLock = new Object(); 061 062 private volatile boolean disconnected; 063 064 065 public WebSocketServerSockJsSession(String id, SockJsServiceConfig config, 066 WebSocketHandler handler, @Nullable Map<String, Object> attributes) { 067 068 super(id, config, handler, attributes); 069 } 070 071 072 @Override 073 @Nullable 074 public URI getUri() { 075 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 076 return this.webSocketSession.getUri(); 077 } 078 079 @Override 080 public HttpHeaders getHandshakeHeaders() { 081 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 082 return this.webSocketSession.getHandshakeHeaders(); 083 } 084 085 @Override 086 public Principal getPrincipal() { 087 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 088 return this.webSocketSession.getPrincipal(); 089 } 090 091 @Override 092 public InetSocketAddress getLocalAddress() { 093 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 094 return this.webSocketSession.getLocalAddress(); 095 } 096 097 @Override 098 public InetSocketAddress getRemoteAddress() { 099 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 100 return this.webSocketSession.getRemoteAddress(); 101 } 102 103 @Override 104 public String getAcceptedProtocol() { 105 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 106 return this.webSocketSession.getAcceptedProtocol(); 107 } 108 109 @Override 110 public void setTextMessageSizeLimit(int messageSizeLimit) { 111 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 112 this.webSocketSession.setTextMessageSizeLimit(messageSizeLimit); 113 } 114 115 @Override 116 public int getTextMessageSizeLimit() { 117 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 118 return this.webSocketSession.getTextMessageSizeLimit(); 119 } 120 121 @Override 122 public void setBinaryMessageSizeLimit(int messageSizeLimit) { 123 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 124 this.webSocketSession.setBinaryMessageSizeLimit(messageSizeLimit); 125 } 126 127 @Override 128 public int getBinaryMessageSizeLimit() { 129 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 130 return this.webSocketSession.getBinaryMessageSizeLimit(); 131 } 132 133 @Override 134 public List<WebSocketExtension> getExtensions() { 135 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 136 return this.webSocketSession.getExtensions(); 137 } 138 139 @Override 140 public Object getNativeSession() { 141 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 142 return (this.webSocketSession instanceof NativeWebSocketSession ? 143 ((NativeWebSocketSession) this.webSocketSession).getNativeSession() : this.webSocketSession); 144 } 145 146 @Override 147 @Nullable 148 public <T> T getNativeSession(@Nullable Class<T> requiredType) { 149 return (this.webSocketSession instanceof NativeWebSocketSession ? 150 ((NativeWebSocketSession) this.webSocketSession).getNativeSession(requiredType) : null); 151 } 152 153 154 public void initializeDelegateSession(WebSocketSession session) { 155 synchronized (this.initSessionLock) { 156 this.webSocketSession = session; 157 try { 158 // Let "our" handler know before sending the open frame to the remote handler 159 delegateConnectionEstablished(); 160 this.webSocketSession.sendMessage(new TextMessage(SockJsFrame.openFrame().getContent())); 161 162 // Flush any messages cached in the mean time 163 while (!this.initSessionCache.isEmpty()) { 164 writeFrame(SockJsFrame.messageFrame(getMessageCodec(), this.initSessionCache.poll())); 165 } 166 scheduleHeartbeat(); 167 this.openFrameSent = true; 168 } 169 catch (Exception ex) { 170 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 171 } 172 } 173 } 174 175 @Override 176 public boolean isActive() { 177 return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected); 178 } 179 180 public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception { 181 String payload = message.getPayload(); 182 if (!StringUtils.hasLength(payload)) { 183 return; 184 } 185 String[] messages; 186 try { 187 messages = getSockJsServiceConfig().getMessageCodec().decode(payload); 188 } 189 catch (Exception ex) { 190 logger.error("Broken data received. Terminating WebSocket connection abruptly", ex); 191 tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA); 192 return; 193 } 194 if (messages != null) { 195 delegateMessages(messages); 196 } 197 } 198 199 @Override 200 public void sendMessageInternal(String message) throws SockJsTransportFailureException { 201 // Open frame not sent yet? 202 // If in the session initialization thread, then cache, otherwise wait. 203 if (!this.openFrameSent) { 204 synchronized (this.initSessionLock) { 205 if (!this.openFrameSent) { 206 this.initSessionCache.add(message); 207 return; 208 } 209 } 210 } 211 212 cancelHeartbeat(); 213 writeFrame(SockJsFrame.messageFrame(getMessageCodec(), message)); 214 scheduleHeartbeat(); 215 } 216 217 @Override 218 protected void writeFrameInternal(SockJsFrame frame) throws IOException { 219 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 220 if (logger.isTraceEnabled()) { 221 logger.trace("Writing " + frame); 222 } 223 TextMessage message = new TextMessage(frame.getContent()); 224 this.webSocketSession.sendMessage(message); 225 } 226 227 @Override 228 protected void disconnect(CloseStatus status) throws IOException { 229 if (isActive()) { 230 synchronized (this.disconnectLock) { 231 if (isActive()) { 232 this.disconnected = true; 233 if (this.webSocketSession != null) { 234 this.webSocketSession.close(status); 235 } 236 } 237 } 238 } 239 } 240 241}