001/* 002 * Copyright 2002-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.URI; 022import java.security.Principal; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.Queue; 027import java.util.concurrent.LinkedBlockingQueue; 028 029import javax.servlet.ServletRequest; 030 031import org.springframework.http.HttpHeaders; 032import org.springframework.http.server.ServerHttpAsyncRequestControl; 033import org.springframework.http.server.ServerHttpRequest; 034import org.springframework.http.server.ServerHttpResponse; 035import org.springframework.http.server.ServletServerHttpRequest; 036import org.springframework.web.filter.ShallowEtagHeaderFilter; 037import org.springframework.web.socket.CloseStatus; 038import org.springframework.web.socket.WebSocketExtension; 039import org.springframework.web.socket.WebSocketHandler; 040import org.springframework.web.socket.sockjs.SockJsException; 041import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 042import org.springframework.web.socket.sockjs.frame.SockJsFrame; 043import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; 044import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 045 046/** 047 * An abstract base class for use with HTTP transport SockJS sessions. 048 * 049 * @author Rossen Stoyanchev 050 * @since 4.0 051 */ 052public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { 053 054 private final Queue<String> messageCache; 055 056 private volatile URI uri; 057 058 private volatile HttpHeaders handshakeHeaders; 059 060 private volatile Principal principal; 061 062 private volatile InetSocketAddress localAddress; 063 064 private volatile InetSocketAddress remoteAddress; 065 066 private volatile String acceptedProtocol; 067 068 private volatile ServerHttpResponse response; 069 070 private volatile SockJsFrameFormat frameFormat; 071 072 private volatile ServerHttpAsyncRequestControl asyncRequestControl; 073 074 private boolean readyToSend; 075 076 077 public AbstractHttpSockJsSession(String id, SockJsServiceConfig config, 078 WebSocketHandler wsHandler, Map<String, Object> attributes) { 079 080 super(id, config, wsHandler, attributes); 081 this.messageCache = new LinkedBlockingQueue<String>(config.getHttpMessageCacheSize()); 082 } 083 084 085 @Override 086 public URI getUri() { 087 return this.uri; 088 } 089 090 @Override 091 public HttpHeaders getHandshakeHeaders() { 092 return this.handshakeHeaders; 093 } 094 095 @Override 096 public Principal getPrincipal() { 097 return this.principal; 098 } 099 100 @Override 101 public InetSocketAddress getLocalAddress() { 102 return this.localAddress; 103 } 104 105 @Override 106 public InetSocketAddress getRemoteAddress() { 107 return this.remoteAddress; 108 } 109 110 /** 111 * Unlike WebSocket where sub-protocol negotiation is part of the initial 112 * handshake, in HTTP transports the same negotiation must be emulated and 113 * the selected protocol set through this setter. 114 * @param protocol the sub-protocol to set 115 */ 116 public void setAcceptedProtocol(String protocol) { 117 this.acceptedProtocol = protocol; 118 } 119 120 /** 121 * Return the selected sub-protocol to use. 122 */ 123 public String getAcceptedProtocol() { 124 return this.acceptedProtocol; 125 } 126 127 /** 128 * Return the SockJS buffer for messages stored transparently between polling 129 * requests. If the polling request takes longer than 5 seconds, the session 130 * is closed. 131 * @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService 132 */ 133 protected Queue<String> getMessageCache() { 134 return this.messageCache; 135 } 136 137 @Override 138 public boolean isActive() { 139 ServerHttpAsyncRequestControl control = this.asyncRequestControl; 140 return (control != null && !control.isCompleted()); 141 } 142 143 @Override 144 public void setTextMessageSizeLimit(int messageSizeLimit) { 145 // ignore 146 } 147 148 @Override 149 public int getTextMessageSizeLimit() { 150 return -1; 151 } 152 153 @Override 154 public void setBinaryMessageSizeLimit(int messageSizeLimit) { 155 // ignore 156 } 157 158 @Override 159 public int getBinaryMessageSizeLimit() { 160 return -1; 161 } 162 163 @Override 164 public List<WebSocketExtension> getExtensions() { 165 return Collections.emptyList(); 166 } 167 168 /** 169 * @deprecated as of 4.2, since this method is no longer used. 170 */ 171 @Deprecated 172 protected abstract boolean isStreaming(); 173 174 175 /** 176 * Handle the first request for receiving messages on a SockJS HTTP transport 177 * based session. 178 * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request 179 * after writing the open frame. Streaming-based transports ("xhr_streaming", 180 * "eventsource", and "htmlfile") leave the response open longer for further 181 * streaming of message frames but will also close it eventually after some 182 * amount of data has been sent. 183 * @param request the current request 184 * @param response the current response 185 * @param frameFormat the transport-specific SocksJS frame format to use 186 */ 187 public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response, 188 SockJsFrameFormat frameFormat) throws SockJsException { 189 190 this.uri = request.getURI(); 191 this.handshakeHeaders = request.getHeaders(); 192 this.principal = request.getPrincipal(); 193 try { 194 this.localAddress = request.getLocalAddress(); 195 } 196 catch (Exception ex) { 197 // Ignore 198 } 199 try { 200 this.remoteAddress = request.getRemoteAddress(); 201 } 202 catch (Exception ex) { 203 // Ignore 204 } 205 206 synchronized (this.responseLock) { 207 try { 208 this.response = response; 209 this.frameFormat = frameFormat; 210 this.asyncRequestControl = request.getAsyncRequestControl(response); 211 this.asyncRequestControl.start(-1); 212 disableShallowEtagHeaderFilter(request); 213 // Let "our" handler know before sending the open frame to the remote handler 214 delegateConnectionEstablished(); 215 handleRequestInternal(request, response, true); 216 // Request might have been reset (e.g. polling sessions do after writing) 217 this.readyToSend = isActive(); 218 } 219 catch (Throwable ex) { 220 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 221 throw new SockJsTransportFailureException("Failed to open session", getId(), ex); 222 } 223 } 224 } 225 226 /** 227 * Handle all requests, except the first one, to receive messages on a SockJS 228 * HTTP transport based session. 229 * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request 230 * after writing any buffered message frames (or the next one). Streaming-based 231 * transports ("xhr_streaming", "eventsource", and "htmlfile") leave the 232 * response open longer for further streaming of message frames but will also 233 * close it eventually after some amount of data has been sent. 234 * @param request the current request 235 * @param response the current response 236 * @param frameFormat the transport-specific SocksJS frame format to use 237 */ 238 public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpResponse response, 239 SockJsFrameFormat frameFormat) throws SockJsException { 240 241 synchronized (this.responseLock) { 242 try { 243 if (isClosed()) { 244 response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes()); 245 return; 246 } 247 this.response = response; 248 this.frameFormat = frameFormat; 249 this.asyncRequestControl = request.getAsyncRequestControl(response); 250 this.asyncRequestControl.start(-1); 251 disableShallowEtagHeaderFilter(request); 252 handleRequestInternal(request, response, false); 253 this.readyToSend = isActive(); 254 } 255 catch (Throwable ex) { 256 tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); 257 throw new SockJsTransportFailureException("Failed to handle SockJS receive request", getId(), ex); 258 } 259 } 260 } 261 262 private void disableShallowEtagHeaderFilter(ServerHttpRequest request) { 263 if (request instanceof ServletServerHttpRequest) { 264 ServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); 265 ShallowEtagHeaderFilter.disableContentCaching(servletRequest); 266 } 267 } 268 269 /** 270 * Invoked when a SockJS transport request is received. 271 * @param request the current request 272 * @param response the current response 273 * @param initialRequest whether it is the first request for the session 274 */ 275 protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response, 276 boolean initialRequest) throws IOException; 277 278 @Override 279 protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { 280 synchronized (this.responseLock) { 281 this.messageCache.add(message); 282 if (logger.isTraceEnabled()) { 283 logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId()); 284 } 285 if (isActive() && this.readyToSend) { 286 if (logger.isTraceEnabled()) { 287 logger.trace("Session is active, ready to flush."); 288 } 289 cancelHeartbeat(); 290 flushCache(); 291 } 292 else { 293 if (logger.isTraceEnabled()) { 294 logger.trace("Session is not active, not ready to flush."); 295 } 296 } 297 } 298 } 299 300 /** 301 * Called when the connection is active and ready to write to the response. 302 * Subclasses should only call this method from a method where the 303 * "responseLock" is acquired. 304 */ 305 protected abstract void flushCache() throws SockJsTransportFailureException; 306 307 308 /** 309 * @deprecated as of 4.2 this method is deprecated since the prelude is written 310 * in {@link #handleRequestInternal} of the StreamingSockJsSession subclass. 311 */ 312 @Deprecated 313 protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { 314 } 315 316 @Override 317 protected void disconnect(CloseStatus status) { 318 resetRequest(); 319 } 320 321 protected void resetRequest() { 322 synchronized (this.responseLock) { 323 ServerHttpAsyncRequestControl control = this.asyncRequestControl; 324 this.asyncRequestControl = null; 325 this.readyToSend = false; 326 this.response = null; 327 updateLastActiveTime(); 328 if (control != null && !control.isCompleted()) { 329 if (control.isStarted()) { 330 try { 331 control.complete(); 332 } 333 catch (Throwable ex) { 334 // Could be part of normal workflow (e.g. browser tab closed) 335 logger.debug("Failed to complete request: " + ex.getMessage()); 336 } 337 } 338 } 339 } 340 } 341 342 @Override 343 protected void writeFrameInternal(SockJsFrame frame) throws IOException { 344 if (isActive()) { 345 String formattedFrame = this.frameFormat.format(frame); 346 if (logger.isTraceEnabled()) { 347 logger.trace("Writing to HTTP response: " + formattedFrame); 348 } 349 this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); 350 this.response.flush(); 351 } 352 } 353 354}