001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.URI; 022import java.security.Principal; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.Queue; 027import java.util.concurrent.LinkedBlockingQueue; 028 029import javax.servlet.ServletRequest; 030 031import org.springframework.http.HttpHeaders; 032import org.springframework.http.server.ServerHttpAsyncRequestControl; 033import org.springframework.http.server.ServerHttpRequest; 034import org.springframework.http.server.ServerHttpResponse; 035import org.springframework.http.server.ServletServerHttpRequest; 036import org.springframework.lang.Nullable; 037import org.springframework.util.Assert; 038import org.springframework.web.filter.ShallowEtagHeaderFilter; 039import org.springframework.web.socket.CloseStatus; 040import org.springframework.web.socket.WebSocketExtension; 041import org.springframework.web.socket.WebSocketHandler; 042import org.springframework.web.socket.sockjs.SockJsException; 043import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 044import org.springframework.web.socket.sockjs.frame.SockJsFrame; 045import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; 046import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 047 048/** 049 * An abstract base class for use with HTTP transport SockJS sessions. 050 * 051 * @author Rossen Stoyanchev 052 * @since 4.0 053 */ 054public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { 055 056 private final Queue<String> messageCache; 057 058 @Nullable 059 private volatile URI uri; 060 061 @Nullable 062 private volatile HttpHeaders handshakeHeaders; 063 064 @Nullable 065 private volatile Principal principal; 066 067 @Nullable 068 private volatile InetSocketAddress localAddress; 069 070 @Nullable 071 private volatile InetSocketAddress remoteAddress; 072 073 @Nullable 074 private volatile String acceptedProtocol; 075 076 @Nullable 077 private volatile ServerHttpResponse response; 078 079 @Nullable 080 private volatile SockJsFrameFormat frameFormat; 081 082 @Nullable 083 private volatile ServerHttpAsyncRequestControl asyncRequestControl; 084 085 private boolean readyToSend; 086 087 088 public AbstractHttpSockJsSession(String id, SockJsServiceConfig config, 089 WebSocketHandler wsHandler, Map<String, Object> attributes) { 090 091 super(id, config, wsHandler, attributes); 092 this.messageCache = new LinkedBlockingQueue<>(config.getHttpMessageCacheSize()); 093 } 094 095 096 @Override 097 public URI getUri() { 098 URI uri = this.uri; 099 Assert.state(uri != null, "No initial request yet"); 100 return uri; 101 } 102 103 @Override 104 public HttpHeaders getHandshakeHeaders() { 105 HttpHeaders headers = this.handshakeHeaders; 106 Assert.state(headers != null, "No initial request yet"); 107 return headers; 108 } 109 110 @Override 111 @Nullable 112 public Principal getPrincipal() { 113 return this.principal; 114 } 115 116 @Override 117 @Nullable 118 public InetSocketAddress getLocalAddress() { 119 return this.localAddress; 120 } 121 122 @Override 123 @Nullable 124 public InetSocketAddress getRemoteAddress() { 125 return this.remoteAddress; 126 } 127 128 /** 129 * Unlike WebSocket where sub-protocol negotiation is part of the initial 130 * handshake, in HTTP transports the same negotiation must be emulated and 131 * the selected protocol set through this setter. 132 * @param protocol the sub-protocol to set 133 */ 134 public void setAcceptedProtocol(@Nullable String protocol) { 135 this.acceptedProtocol = protocol; 136 } 137 138 /** 139 * Return the selected sub-protocol to use. 140 */ 141 @Override 142 @Nullable 143 public String getAcceptedProtocol() { 144 return this.acceptedProtocol; 145 } 146 147 /** 148 * Return the SockJS buffer for messages stored transparently between polling 149 * requests. If the polling request takes longer than 5 seconds, the session 150 * is closed. 151 * @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService 152 */ 153 protected Queue<String> getMessageCache() { 154 return this.messageCache; 155 } 156 157 158 @Override 159 public boolean isActive() { 160 ServerHttpAsyncRequestControl control = this.asyncRequestControl; 161 return (control != null && !control.isCompleted()); 162 } 163 164 @Override 165 public void setTextMessageSizeLimit(int messageSizeLimit) { 166 // ignore 167 } 168 169 @Override 170 public int getTextMessageSizeLimit() { 171 return -1; 172 } 173 174 @Override 175 public void setBinaryMessageSizeLimit(int messageSizeLimit) { 176 // ignore 177 } 178 179 @Override 180 public int getBinaryMessageSizeLimit() { 181 return -1; 182 } 183 184 @Override 185 public List<WebSocketExtension> getExtensions() { 186 return Collections.emptyList(); 187 } 188 189 190 /** 191 * Handle the first request for receiving messages on a SockJS HTTP transport 192 * based session. 193 * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request 194 * after writing the open frame. Streaming-based transports ("xhr_streaming", 195 * "eventsource", and "htmlfile") leave the response open longer for further 196 * streaming of message frames but will also close it eventually after some 197 * amount of data has been sent. 198 * @param request the current request 199 * @param response the current response 200 * @param frameFormat the transport-specific SocksJS frame format to use 201 */ 202 public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response, 203 SockJsFrameFormat frameFormat) throws SockJsException { 204 205 this.uri = request.getURI(); 206 this.handshakeHeaders = request.getHeaders(); 207 this.principal = request.getPrincipal(); 208 try { 209 this.localAddress = request.getLocalAddress(); 210 } 211 catch (Exception ex) { 212 // Ignore 213 } 214 try { 215 this.remoteAddress = request.getRemoteAddress(); 216 } 217 catch (Exception ex) { 218 // Ignore 219 } 220 221 synchronized (this.responseLock) { 222 try { 223 this.response = response; 224 this.frameFormat = frameFormat; 225 ServerHttpAsyncRequestControl control = request.getAsyncRequestControl(response); 226 this.asyncRequestControl = control; 227 control.start(-1); 228 disableShallowEtagHeaderFilter(request); 229 // Let "our" handler know before sending the open frame to the remote handler 230 delegateConnectionEstablished(); 231 handleRequestInternal(request, response, true); 232 // Request might have been reset (e.g. polling sessions do after writing) 233 this.readyToSend = isActive(); 234 } 235 catch (Throwable ex) { 236 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 237 throw new SockJsTransportFailureException("Failed to open session", getId(), ex); 238 } 239 } 240 } 241 242 /** 243 * Handle all requests, except the first one, to receive messages on a SockJS 244 * HTTP transport based session. 245 * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request 246 * after writing any buffered message frames (or the next one). Streaming-based 247 * transports ("xhr_streaming", "eventsource", and "htmlfile") leave the 248 * response open longer for further streaming of message frames but will also 249 * close it eventually after some amount of data has been sent. 250 * @param request the current request 251 * @param response the current response 252 * @param frameFormat the transport-specific SocksJS frame format to use 253 */ 254 public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpResponse response, 255 SockJsFrameFormat frameFormat) throws SockJsException { 256 257 synchronized (this.responseLock) { 258 try { 259 if (isClosed()) { 260 response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes()); 261 return; 262 } 263 this.response = response; 264 this.frameFormat = frameFormat; 265 ServerHttpAsyncRequestControl control = request.getAsyncRequestControl(response); 266 this.asyncRequestControl = control; 267 control.start(-1); 268 disableShallowEtagHeaderFilter(request); 269 handleRequestInternal(request, response, false); 270 this.readyToSend = isActive(); 271 } 272 catch (Throwable ex) { 273 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 274 throw new SockJsTransportFailureException("Failed to handle SockJS receive request", getId(), ex); 275 } 276 } 277 } 278 279 private void disableShallowEtagHeaderFilter(ServerHttpRequest request) { 280 if (request instanceof ServletServerHttpRequest) { 281 ServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); 282 ShallowEtagHeaderFilter.disableContentCaching(servletRequest); 283 } 284 } 285 286 /** 287 * Invoked when a SockJS transport request is received. 288 * @param request the current request 289 * @param response the current response 290 * @param initialRequest whether it is the first request for the session 291 */ 292 protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, 293 boolean initialRequest) throws IOException; 294 295 @Override 296 protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { 297 synchronized (this.responseLock) { 298 this.messageCache.add(message); 299 if (logger.isTraceEnabled()) { 300 logger.trace(this.messageCache.size() + " message(s) to flush in session " + getId()); 301 } 302 if (isActive() && this.readyToSend) { 303 if (logger.isTraceEnabled()) { 304 logger.trace("Session is active, ready to flush."); 305 } 306 cancelHeartbeat(); 307 flushCache(); 308 } 309 else { 310 if (logger.isTraceEnabled()) { 311 logger.trace("Session is not active, not ready to flush."); 312 } 313 } 314 } 315 } 316 317 /** 318 * Called when the connection is active and ready to write to the response. 319 * Subclasses should only call this method from a method where the 320 * "responseLock" is acquired. 321 */ 322 protected abstract void flushCache() throws SockJsTransportFailureException; 323 324 325 @Override 326 protected void disconnect(CloseStatus status) { 327 resetRequest(); 328 } 329 330 protected void resetRequest() { 331 synchronized (this.responseLock) { 332 ServerHttpAsyncRequestControl control = this.asyncRequestControl; 333 this.asyncRequestControl = null; 334 this.readyToSend = false; 335 this.response = null; 336 updateLastActiveTime(); 337 if (control != null && !control.isCompleted() && control.isStarted()) { 338 try { 339 control.complete(); 340 } 341 catch (Throwable ex) { 342 // Could be part of normal workflow (e.g. browser tab closed) 343 logger.debug("Failed to complete request: " + ex.getMessage()); 344 } 345 } 346 } 347 } 348 349 @Override 350 protected void writeFrameInternal(SockJsFrame frame) throws IOException { 351 if (isActive()) { 352 SockJsFrameFormat frameFormat = this.frameFormat; 353 ServerHttpResponse response = this.response; 354 if (frameFormat != null && response != null) { 355 String formattedFrame = frameFormat.format(frame); 356 if (logger.isTraceEnabled()) { 357 logger.trace("Writing to HTTP response: " + formattedFrame); 358 } 359 response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); 360 response.flush(); 361 } 362 } 363 } 364 365}