001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.URI; 022import java.security.Principal; 023import java.util.List; 024import java.util.Map; 025import java.util.Queue; 026import java.util.concurrent.LinkedBlockingDeque; 027 028import org.springframework.http.HttpHeaders; 029import org.springframework.util.Assert; 030import org.springframework.util.StringUtils; 031import org.springframework.web.socket.CloseStatus; 032import org.springframework.web.socket.TextMessage; 033import org.springframework.web.socket.WebSocketExtension; 034import org.springframework.web.socket.WebSocketHandler; 035import org.springframework.web.socket.WebSocketSession; 036import org.springframework.web.socket.adapter.NativeWebSocketSession; 037import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 038import org.springframework.web.socket.sockjs.frame.SockJsFrame; 039import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 040 041/** 042 * A SockJS session for use with the WebSocket transport. 043 * 044 * @author Rossen Stoyanchev 045 * @author Juergen Hoeller 046 * @since 4.0 047 */ 048public class WebSocketServerSockJsSession extends AbstractSockJsSession implements NativeWebSocketSession { 049 050 private WebSocketSession webSocketSession; 051 052 private volatile boolean openFrameSent; 053 054 private final Queue<String> initSessionCache = new LinkedBlockingDeque<String>(); 055 056 private final Object initSessionLock = new Object(); 057 058 private final Object disconnectLock = new Object(); 059 060 private volatile boolean disconnected; 061 062 063 public WebSocketServerSockJsSession(String id, SockJsServiceConfig config, 064 WebSocketHandler handler, Map<String, Object> attributes) { 065 066 super(id, config, handler, attributes); 067 } 068 069 070 @Override 071 public URI getUri() { 072 checkDelegateSessionInitialized(); 073 return this.webSocketSession.getUri(); 074 } 075 076 @Override 077 public HttpHeaders getHandshakeHeaders() { 078 checkDelegateSessionInitialized(); 079 return this.webSocketSession.getHandshakeHeaders(); 080 } 081 082 @Override 083 public Principal getPrincipal() { 084 checkDelegateSessionInitialized(); 085 return this.webSocketSession.getPrincipal(); 086 } 087 088 @Override 089 public InetSocketAddress getLocalAddress() { 090 checkDelegateSessionInitialized(); 091 return this.webSocketSession.getLocalAddress(); 092 } 093 094 @Override 095 public InetSocketAddress getRemoteAddress() { 096 checkDelegateSessionInitialized(); 097 return this.webSocketSession.getRemoteAddress(); 098 } 099 100 @Override 101 public String getAcceptedProtocol() { 102 checkDelegateSessionInitialized(); 103 return this.webSocketSession.getAcceptedProtocol(); 104 } 105 106 @Override 107 public void setTextMessageSizeLimit(int messageSizeLimit) { 108 checkDelegateSessionInitialized(); 109 this.webSocketSession.setTextMessageSizeLimit(messageSizeLimit); 110 } 111 112 @Override 113 public int getTextMessageSizeLimit() { 114 checkDelegateSessionInitialized(); 115 return this.webSocketSession.getTextMessageSizeLimit(); 116 } 117 118 @Override 119 public void setBinaryMessageSizeLimit(int messageSizeLimit) { 120 checkDelegateSessionInitialized(); 121 this.webSocketSession.setBinaryMessageSizeLimit(messageSizeLimit); 122 } 123 124 @Override 125 public int getBinaryMessageSizeLimit() { 126 checkDelegateSessionInitialized(); 127 return this.webSocketSession.getBinaryMessageSizeLimit(); 128 } 129 130 @Override 131 public List<WebSocketExtension> getExtensions() { 132 checkDelegateSessionInitialized(); 133 return this.webSocketSession.getExtensions(); 134 } 135 136 private void checkDelegateSessionInitialized() { 137 Assert.state(this.webSocketSession != null, "WebSocketSession not yet initialized"); 138 } 139 140 @Override 141 public Object getNativeSession() { 142 return (this.webSocketSession instanceof NativeWebSocketSession ? 143 ((NativeWebSocketSession) this.webSocketSession).getNativeSession() : this.webSocketSession); 144 } 145 146 @Override 147 public <T> T getNativeSession(Class<T> requiredType) { 148 return (this.webSocketSession instanceof NativeWebSocketSession ? 149 ((NativeWebSocketSession) this.webSocketSession).getNativeSession(requiredType) : null); 150 } 151 152 153 public void initializeDelegateSession(WebSocketSession session) { 154 synchronized (this.initSessionLock) { 155 this.webSocketSession = session; 156 try { 157 // Let "our" handler know before sending the open frame to the remote handler 158 delegateConnectionEstablished(); 159 this.webSocketSession.sendMessage(new TextMessage(SockJsFrame.openFrame().getContent())); 160 161 // Flush any messages cached in the mean time 162 while (!this.initSessionCache.isEmpty()) { 163 writeFrame(SockJsFrame.messageFrame(getMessageCodec(), this.initSessionCache.poll())); 164 } 165 scheduleHeartbeat(); 166 this.openFrameSent = true; 167 } 168 catch (Throwable ex) { 169 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 170 } 171 } 172 } 173 174 @Override 175 public boolean isActive() { 176 return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected); 177 } 178 179 public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception { 180 String payload = message.getPayload(); 181 if (StringUtils.isEmpty(payload)) { 182 return; 183 } 184 String[] messages; 185 try { 186 messages = getSockJsServiceConfig().getMessageCodec().decode(payload); 187 } 188 catch (Throwable ex) { 189 logger.error("Broken data received. Terminating WebSocket connection abruptly", ex); 190 tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA); 191 return; 192 } 193 if (messages != null) { 194 delegateMessages(messages); 195 } 196 } 197 198 @Override 199 public void sendMessageInternal(String message) throws SockJsTransportFailureException { 200 // Open frame not sent yet? 201 // If in the session initialization thread, then cache, otherwise wait. 202 if (!this.openFrameSent) { 203 synchronized (this.initSessionLock) { 204 if (!this.openFrameSent) { 205 this.initSessionCache.add(message); 206 return; 207 } 208 } 209 } 210 211 cancelHeartbeat(); 212 writeFrame(SockJsFrame.messageFrame(getMessageCodec(), message)); 213 scheduleHeartbeat(); 214 } 215 216 @Override 217 protected void writeFrameInternal(SockJsFrame frame) throws IOException { 218 if (logger.isTraceEnabled()) { 219 logger.trace("Writing " + frame); 220 } 221 TextMessage message = new TextMessage(frame.getContent()); 222 this.webSocketSession.sendMessage(message); 223 } 224 225 @Override 226 protected void disconnect(CloseStatus status) throws IOException { 227 if (isActive()) { 228 synchronized (this.disconnectLock) { 229 if (isActive()) { 230 this.disconnected = true; 231 if (this.webSocketSession != null) { 232 this.webSocketSession.close(status); 233 } 234 } 235 } 236 } 237 } 238 239}