001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Date; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ScheduledFuture; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.springframework.core.NestedExceptionUtils; 034import org.springframework.util.Assert; 035import org.springframework.web.socket.CloseStatus; 036import org.springframework.web.socket.TextMessage; 037import org.springframework.web.socket.WebSocketHandler; 038import org.springframework.web.socket.WebSocketMessage; 039import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; 040import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 041import org.springframework.web.socket.sockjs.frame.SockJsFrame; 042import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; 043import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 044import org.springframework.web.socket.sockjs.transport.SockJsSession; 045 046/** 047 * An abstract base class for SockJS sessions implementing {@link SockJsSession}. 048 * 049 * @author Rossen Stoyanchev 050 * @author Sam Brannen 051 * @since 4.0 052 */ 053public abstract class AbstractSockJsSession implements SockJsSession { 054 055 private enum State {NEW, OPEN, CLOSED} 056 057 058 /** 059 * Log category to use on network IO exceptions after a client has gone away. 060 * <p>The Servlet API does not provide notifications when a client disconnects; 061 * see <a href="https://java.net/jira/browse/SERVLET_SPEC-44">SERVLET_SPEC-44</a>. 062 * Therefore network IO failures may occur simply because a client has gone away, 063 * and that can fill the logs with unnecessary stack traces. 064 * <p>We make a best effort to identify such network failures, on a per-server 065 * basis, and log them under a separate log category. A simple one-line message 066 * is logged at DEBUG level, while a full stack trace is shown at TRACE level. 067 * @see #disconnectedClientLogger 068 */ 069 public static final String DISCONNECTED_CLIENT_LOG_CATEGORY = 070 "org.springframework.web.socket.sockjs.DisconnectedClient"; 071 072 /** 073 * Tomcat: ClientAbortException or EOFException 074 * Jetty: EofException 075 * WildFly, GlassFish: java.io.IOException "Broken pipe" (already covered) 076 * @see #indicatesDisconnectedClient(Throwable) 077 */ 078 private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = 079 new HashSet<String>(Arrays.asList("ClientAbortException", "EOFException", "EofException")); 080 081 082 /** 083 * Separate logger to use on network IO failure after a client has gone away. 084 * @see #DISCONNECTED_CLIENT_LOG_CATEGORY 085 */ 086 protected static final Log disconnectedClientLogger = LogFactory.getLog(DISCONNECTED_CLIENT_LOG_CATEGORY); 087 088 protected final Log logger = LogFactory.getLog(getClass()); 089 090 protected final Object responseLock = new Object(); 091 092 private final String id; 093 094 private final SockJsServiceConfig config; 095 096 private final WebSocketHandler handler; 097 098 private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>(); 099 100 private volatile State state = State.NEW; 101 102 private final long timeCreated = System.currentTimeMillis(); 103 104 private volatile long timeLastActive = this.timeCreated; 105 106 private ScheduledFuture<?> heartbeatFuture; 107 108 private HeartbeatTask heartbeatTask; 109 110 private volatile boolean heartbeatDisabled; 111 112 113 /** 114 * Create a new instance. 115 * @param id the session ID 116 * @param config SockJS service configuration options 117 * @param handler the recipient of SockJS messages 118 * @param attributes attributes from the HTTP handshake to associate with the WebSocket 119 * session; the provided attributes are copied, the original map is not used. 120 */ 121 public AbstractSockJsSession(String id, SockJsServiceConfig config, WebSocketHandler handler, 122 Map<String, Object> attributes) { 123 124 Assert.notNull(id, "Session id must not be null"); 125 Assert.notNull(config, "SockJsServiceConfig must not be null"); 126 Assert.notNull(handler, "WebSocketHandler must not be null"); 127 128 this.id = id; 129 this.config = config; 130 this.handler = handler; 131 132 if (attributes != null) { 133 this.attributes.putAll(attributes); 134 } 135 } 136 137 138 @Override 139 public String getId() { 140 return this.id; 141 } 142 143 protected SockJsMessageCodec getMessageCodec() { 144 return this.config.getMessageCodec(); 145 } 146 147 public SockJsServiceConfig getSockJsServiceConfig() { 148 return this.config; 149 } 150 151 @Override 152 public Map<String, Object> getAttributes() { 153 return this.attributes; 154 } 155 156 157 // Message sending 158 159 public final void sendMessage(WebSocketMessage<?> message) throws IOException { 160 Assert.state(!isClosed(), "Cannot send a message when session is closed"); 161 Assert.isInstanceOf(TextMessage.class, message, "SockJS supports text messages only"); 162 sendMessageInternal(((TextMessage) message).getPayload()); 163 } 164 165 protected abstract void sendMessageInternal(String message) throws IOException; 166 167 168 // Lifecycle related methods 169 170 public boolean isNew() { 171 return State.NEW.equals(this.state); 172 } 173 174 @Override 175 public boolean isOpen() { 176 return State.OPEN.equals(this.state); 177 } 178 179 public boolean isClosed() { 180 return State.CLOSED.equals(this.state); 181 } 182 183 /** 184 * Performs cleanup and notify the {@link WebSocketHandler}. 185 */ 186 @Override 187 public final void close() throws IOException { 188 close(new CloseStatus(3000, "Go away!")); 189 } 190 191 /** 192 * Performs cleanup and notify the {@link WebSocketHandler}. 193 */ 194 @Override 195 public final void close(CloseStatus status) throws IOException { 196 if (isOpen()) { 197 if (logger.isDebugEnabled()) { 198 logger.debug("Closing SockJS session " + getId() + " with " + status); 199 } 200 this.state = State.CLOSED; 201 try { 202 if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { 203 try { 204 writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason())); 205 } 206 catch (Throwable ex) { 207 logger.debug("Failure while sending SockJS close frame", ex); 208 } 209 } 210 updateLastActiveTime(); 211 cancelHeartbeat(); 212 disconnect(status); 213 } 214 finally { 215 try { 216 this.handler.afterConnectionClosed(this, status); 217 } 218 catch (Throwable ex) { 219 logger.debug("Error from WebSocketHandler.afterConnectionClosed in " + this, ex); 220 } 221 } 222 } 223 } 224 225 @Override 226 public long getTimeSinceLastActive() { 227 if (isNew()) { 228 return (System.currentTimeMillis() - this.timeCreated); 229 } 230 else { 231 return (isActive() ? 0 : System.currentTimeMillis() - this.timeLastActive); 232 } 233 } 234 235 /** 236 * Should be invoked whenever the session becomes inactive. 237 */ 238 protected void updateLastActiveTime() { 239 this.timeLastActive = System.currentTimeMillis(); 240 } 241 242 @Override 243 public void disableHeartbeat() { 244 this.heartbeatDisabled = true; 245 cancelHeartbeat(); 246 } 247 248 public void sendHeartbeat() throws SockJsTransportFailureException { 249 synchronized (this.responseLock) { 250 if (isActive() && !this.heartbeatDisabled) { 251 writeFrame(SockJsFrame.heartbeatFrame()); 252 scheduleHeartbeat(); 253 } 254 } 255 } 256 257 protected void scheduleHeartbeat() { 258 if (this.heartbeatDisabled) { 259 return; 260 } 261 synchronized (this.responseLock) { 262 cancelHeartbeat(); 263 if (!isActive()) { 264 return; 265 } 266 Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime()); 267 this.heartbeatTask = new HeartbeatTask(); 268 this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time); 269 if (logger.isTraceEnabled()) { 270 logger.trace("Scheduled heartbeat in session " + getId()); 271 } 272 } 273 } 274 275 protected void cancelHeartbeat() { 276 synchronized (this.responseLock) { 277 if (this.heartbeatFuture != null) { 278 if (logger.isTraceEnabled()) { 279 logger.trace("Cancelling heartbeat in session " + getId()); 280 } 281 this.heartbeatFuture.cancel(false); 282 this.heartbeatFuture = null; 283 } 284 if (this.heartbeatTask != null) { 285 this.heartbeatTask.cancel(); 286 this.heartbeatTask = null; 287 } 288 } 289 } 290 291 /** 292 * Polling and Streaming sessions periodically close the current HTTP request and 293 * wait for the next request to come through. During this "downtime" the session is 294 * still open but inactive and unable to send messages and therefore has to buffer 295 * them temporarily. A WebSocket session by contrast is stateful and remain active 296 * until closed. 297 */ 298 public abstract boolean isActive(); 299 300 /** 301 * Actually close the underlying WebSocket session or in the case of HTTP 302 * transports complete the underlying request. 303 */ 304 protected abstract void disconnect(CloseStatus status) throws IOException; 305 306 307 // Frame writing 308 309 /** 310 * For internal use within a TransportHandler and the (TransportHandler-specific) 311 * session class. 312 */ 313 protected void writeFrame(SockJsFrame frame) throws SockJsTransportFailureException { 314 if (logger.isTraceEnabled()) { 315 logger.trace("Preparing to write " + frame); 316 } 317 try { 318 writeFrameInternal(frame); 319 } 320 catch (Throwable ex) { 321 logWriteFrameFailure(ex); 322 try { 323 // Force disconnect (so we won't try to send close frame) 324 disconnect(CloseStatus.SERVER_ERROR); 325 } 326 catch (Throwable disconnectFailure) { 327 // Ignore 328 } 329 try { 330 close(CloseStatus.SERVER_ERROR); 331 } 332 catch (Throwable closeFailure) { 333 // Nothing of consequence, already forced disconnect 334 } 335 throw new SockJsTransportFailureException("Failed to write " + frame, getId(), ex); 336 } 337 } 338 339 protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException; 340 341 private void logWriteFrameFailure(Throwable ex) { 342 if (indicatesDisconnectedClient(ex)) { 343 if (disconnectedClientLogger.isTraceEnabled()) { 344 disconnectedClientLogger.trace("Looks like the client has gone away", ex); 345 } 346 else if (disconnectedClientLogger.isDebugEnabled()) { 347 disconnectedClientLogger.debug("Looks like the client has gone away: " + ex + 348 " (For a full stack trace, set the log category '" + DISCONNECTED_CLIENT_LOG_CATEGORY + 349 "' to TRACE level.)"); 350 } 351 } 352 else { 353 logger.debug("Terminating connection after failure to send message to client", ex); 354 } 355 } 356 357 private boolean indicatesDisconnectedClient(Throwable ex) { 358 String message = NestedExceptionUtils.getMostSpecificCause(ex).getMessage(); 359 message = (message != null ? message.toLowerCase() : ""); 360 String className = ex.getClass().getSimpleName(); 361 return (message.contains("broken pipe") || DISCONNECTED_CLIENT_EXCEPTIONS.contains(className)); 362 } 363 364 365 // Delegation methods 366 367 public void delegateConnectionEstablished() throws Exception { 368 this.state = State.OPEN; 369 this.handler.afterConnectionEstablished(this); 370 } 371 372 public void delegateMessages(String... messages) throws SockJsMessageDeliveryException { 373 List<String> undelivered = new ArrayList<String>(Arrays.asList(messages)); 374 for (String message : messages) { 375 try { 376 if (isClosed()) { 377 throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed"); 378 } 379 else { 380 this.handler.handleMessage(this, new TextMessage(message)); 381 undelivered.remove(0); 382 } 383 } 384 catch (Throwable ex) { 385 throw new SockJsMessageDeliveryException(this.id, undelivered, ex); 386 } 387 } 388 } 389 390 /** 391 * Invoked when the underlying connection is closed. 392 */ 393 public final void delegateConnectionClosed(CloseStatus status) throws Exception { 394 if (!isClosed()) { 395 try { 396 updateLastActiveTime(); 397 // Avoid cancelHeartbeat() and responseLock within server "close" callback 398 ScheduledFuture<?> future = this.heartbeatFuture; 399 if (future != null) { 400 this.heartbeatFuture = null; 401 future.cancel(false); 402 } 403 } 404 finally { 405 this.state = State.CLOSED; 406 this.handler.afterConnectionClosed(this, status); 407 } 408 } 409 } 410 411 /** 412 * Close due to error arising from SockJS transport handling. 413 */ 414 public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) { 415 if (logger.isDebugEnabled()) { 416 logger.debug("Closing due to transport error for " + this); 417 } 418 try { 419 delegateError(error); 420 } 421 catch (Throwable delegateException) { 422 // Ignore 423 logger.debug("Exception from error handling delegate", delegateException); 424 } 425 try { 426 close(closeStatus); 427 } 428 catch (Throwable closeException) { 429 logger.debug("Failure while closing " + this, closeException); 430 } 431 } 432 433 public void delegateError(Throwable ex) throws Exception { 434 this.handler.handleTransportError(this, ex); 435 } 436 437 438 // Self description 439 440 @Override 441 public String toString() { 442 return getClass().getSimpleName() + "[id=" + getId() + "]"; 443 } 444 445 446 private class HeartbeatTask implements Runnable { 447 448 private boolean expired; 449 450 @Override 451 public void run() { 452 synchronized (responseLock) { 453 if (!this.expired && !isClosed()) { 454 try { 455 sendHeartbeat(); 456 } 457 catch (Throwable ex) { 458 // Ignore: already handled in writeFrame... 459 } 460 finally { 461 this.expired = true; 462 } 463 } 464 } 465 } 466 467 void cancel() { 468 this.expired = true; 469 } 470 } 471 472}