001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.transport.session; 018 019import java.io.IOException; 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.Date; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ScheduledFuture; 029import java.util.stream.Collectors; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034import org.springframework.core.NestedExceptionUtils; 035import org.springframework.lang.Nullable; 036import org.springframework.util.Assert; 037import org.springframework.web.socket.CloseStatus; 038import org.springframework.web.socket.TextMessage; 039import org.springframework.web.socket.WebSocketHandler; 040import org.springframework.web.socket.WebSocketMessage; 041import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; 042import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 043import org.springframework.web.socket.sockjs.frame.SockJsFrame; 044import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; 045import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; 046import org.springframework.web.socket.sockjs.transport.SockJsSession; 047 048/** 049 * An abstract base class for SockJS sessions implementing {@link SockJsSession}. 050 * 051 * @author Rossen Stoyanchev 052 * @author Sam Brannen 053 * @since 4.0 054 */ 055public abstract class AbstractSockJsSession implements SockJsSession { 056 057 private enum State {NEW, OPEN, CLOSED} 058 059 060 /** 061 * Log category to use on network IO exceptions after a client has gone away. 062 * <p>Servlet containers don't expose a client disconnected callback; see 063 * <a href="https://github.com/eclipse-ee4j/servlet-api/issues/44">eclipse-ee4j/servlet-api#44</a>. 064 * Therefore network IO failures may occur simply because a client has gone away, 065 * and that can fill the logs with unnecessary stack traces. 066 * <p>We make a best effort to identify such network failures, on a per-server 067 * basis, and log them under a separate log category. A simple one-line message 068 * is logged at DEBUG level, while a full stack trace is shown at TRACE level. 069 * @see #disconnectedClientLogger 070 */ 071 public static final String DISCONNECTED_CLIENT_LOG_CATEGORY = 072 "org.springframework.web.socket.sockjs.DisconnectedClient"; 073 074 /** 075 * Tomcat: ClientAbortException or EOFException 076 * Jetty: EofException 077 * WildFly, GlassFish: java.io.IOException "Broken pipe" (already covered) 078 * <p>TODO: 079 * This definition is currently duplicated between HttpWebHandlerAdapter 080 * and AbstractSockJsSession. It is a candidate for a common utility class. 081 * @see #indicatesDisconnectedClient(Throwable) 082 */ 083 private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = 084 new HashSet<>(Arrays.asList("ClientAbortException", "EOFException", "EofException")); 085 086 087 /** 088 * Separate logger to use on network IO failure after a client has gone away. 089 * @see #DISCONNECTED_CLIENT_LOG_CATEGORY 090 */ 091 protected static final Log disconnectedClientLogger = LogFactory.getLog(DISCONNECTED_CLIENT_LOG_CATEGORY); 092 093 protected final Log logger = LogFactory.getLog(getClass()); 094 095 protected final Object responseLock = new Object(); 096 097 private final String id; 098 099 private final SockJsServiceConfig config; 100 101 private final WebSocketHandler handler; 102 103 private final Map<String, Object> attributes = new ConcurrentHashMap<>(); 104 105 private volatile State state = State.NEW; 106 107 private final long timeCreated = System.currentTimeMillis(); 108 109 private volatile long timeLastActive = this.timeCreated; 110 111 @Nullable 112 private ScheduledFuture<?> heartbeatFuture; 113 114 @Nullable 115 private HeartbeatTask heartbeatTask; 116 117 private volatile boolean heartbeatDisabled; 118 119 120 /** 121 * Create a new instance. 122 * @param id the session ID 123 * @param config the SockJS service configuration options 124 * @param handler the recipient of SockJS messages 125 * @param attributes the attributes from the HTTP handshake to associate with the WebSocket 126 * session; the provided attributes are copied, the original map is not used. 127 */ 128 public AbstractSockJsSession(String id, SockJsServiceConfig config, WebSocketHandler handler, 129 @Nullable Map<String, Object> attributes) { 130 131 Assert.notNull(id, "Session id must not be null"); 132 Assert.notNull(config, "SockJsServiceConfig must not be null"); 133 Assert.notNull(handler, "WebSocketHandler must not be null"); 134 135 this.id = id; 136 this.config = config; 137 this.handler = handler; 138 139 if (attributes != null) { 140 this.attributes.putAll(attributes); 141 } 142 } 143 144 145 @Override 146 public String getId() { 147 return this.id; 148 } 149 150 protected SockJsMessageCodec getMessageCodec() { 151 return this.config.getMessageCodec(); 152 } 153 154 public SockJsServiceConfig getSockJsServiceConfig() { 155 return this.config; 156 } 157 158 @Override 159 public Map<String, Object> getAttributes() { 160 return this.attributes; 161 } 162 163 164 // Message sending 165 166 @Override 167 public final void sendMessage(WebSocketMessage<?> message) throws IOException { 168 Assert.state(!isClosed(), "Cannot send a message when session is closed"); 169 Assert.isInstanceOf(TextMessage.class, message, "SockJS supports text messages only"); 170 sendMessageInternal(((TextMessage) message).getPayload()); 171 } 172 173 protected abstract void sendMessageInternal(String message) throws IOException; 174 175 176 // Lifecycle related methods 177 178 public boolean isNew() { 179 return State.NEW.equals(this.state); 180 } 181 182 @Override 183 public boolean isOpen() { 184 return State.OPEN.equals(this.state); 185 } 186 187 public boolean isClosed() { 188 return State.CLOSED.equals(this.state); 189 } 190 191 /** 192 * Performs cleanup and notify the {@link WebSocketHandler}. 193 */ 194 @Override 195 public final void close() throws IOException { 196 close(new CloseStatus(3000, "Go away!")); 197 } 198 199 /** 200 * Performs cleanup and notify the {@link WebSocketHandler}. 201 */ 202 @Override 203 public final void close(CloseStatus status) throws IOException { 204 if (isOpen()) { 205 if (logger.isDebugEnabled()) { 206 logger.debug("Closing SockJS session " + getId() + " with " + status); 207 } 208 this.state = State.CLOSED; 209 try { 210 if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { 211 try { 212 writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason())); 213 } 214 catch (Throwable ex) { 215 logger.debug("Failure while sending SockJS close frame", ex); 216 } 217 } 218 updateLastActiveTime(); 219 cancelHeartbeat(); 220 disconnect(status); 221 } 222 finally { 223 try { 224 this.handler.afterConnectionClosed(this, status); 225 } 226 catch (Throwable ex) { 227 logger.debug("Error from WebSocketHandler.afterConnectionClosed in " + this, ex); 228 } 229 } 230 } 231 } 232 233 @Override 234 public long getTimeSinceLastActive() { 235 if (isNew()) { 236 return (System.currentTimeMillis() - this.timeCreated); 237 } 238 else { 239 return (isActive() ? 0 : System.currentTimeMillis() - this.timeLastActive); 240 } 241 } 242 243 /** 244 * Should be invoked whenever the session becomes inactive. 245 */ 246 protected void updateLastActiveTime() { 247 this.timeLastActive = System.currentTimeMillis(); 248 } 249 250 @Override 251 public void disableHeartbeat() { 252 this.heartbeatDisabled = true; 253 cancelHeartbeat(); 254 } 255 256 protected void sendHeartbeat() throws SockJsTransportFailureException { 257 synchronized (this.responseLock) { 258 if (isActive() && !this.heartbeatDisabled) { 259 writeFrame(SockJsFrame.heartbeatFrame()); 260 scheduleHeartbeat(); 261 } 262 } 263 } 264 265 protected void scheduleHeartbeat() { 266 if (this.heartbeatDisabled) { 267 return; 268 } 269 synchronized (this.responseLock) { 270 cancelHeartbeat(); 271 if (!isActive()) { 272 return; 273 } 274 Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime()); 275 this.heartbeatTask = new HeartbeatTask(); 276 this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time); 277 if (logger.isTraceEnabled()) { 278 logger.trace("Scheduled heartbeat in session " + getId()); 279 } 280 } 281 } 282 283 protected void cancelHeartbeat() { 284 synchronized (this.responseLock) { 285 if (this.heartbeatFuture != null) { 286 if (logger.isTraceEnabled()) { 287 logger.trace("Cancelling heartbeat in session " + getId()); 288 } 289 this.heartbeatFuture.cancel(false); 290 this.heartbeatFuture = null; 291 } 292 if (this.heartbeatTask != null) { 293 this.heartbeatTask.cancel(); 294 this.heartbeatTask = null; 295 } 296 } 297 } 298 299 /** 300 * Polling and Streaming sessions periodically close the current HTTP request and 301 * wait for the next request to come through. During this "downtime" the session is 302 * still open but inactive and unable to send messages and therefore has to buffer 303 * them temporarily. A WebSocket session by contrast is stateful and remain active 304 * until closed. 305 */ 306 public abstract boolean isActive(); 307 308 /** 309 * Actually close the underlying WebSocket session or in the case of HTTP 310 * transports complete the underlying request. 311 */ 312 protected abstract void disconnect(CloseStatus status) throws IOException; 313 314 315 // Frame writing 316 317 /** 318 * For internal use within a TransportHandler and the (TransportHandler-specific) 319 * session class. 320 */ 321 protected void writeFrame(SockJsFrame frame) throws SockJsTransportFailureException { 322 if (logger.isTraceEnabled()) { 323 logger.trace("Preparing to write " + frame); 324 } 325 try { 326 writeFrameInternal(frame); 327 } 328 catch (Exception ex) { 329 logWriteFrameFailure(ex); 330 try { 331 // Force disconnect (so we won't try to send close frame) 332 disconnect(CloseStatus.SERVER_ERROR); 333 } 334 catch (Throwable disconnectFailure) { 335 // Ignore 336 } 337 try { 338 close(CloseStatus.SERVER_ERROR); 339 } 340 catch (Throwable closeFailure) { 341 // Nothing of consequence, already forced disconnect 342 } 343 throw new SockJsTransportFailureException("Failed to write " + frame, getId(), ex); 344 } 345 } 346 347 protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException; 348 349 private void logWriteFrameFailure(Throwable ex) { 350 if (indicatesDisconnectedClient(ex)) { 351 if (disconnectedClientLogger.isTraceEnabled()) { 352 disconnectedClientLogger.trace("Looks like the client has gone away", ex); 353 } 354 else if (disconnectedClientLogger.isDebugEnabled()) { 355 disconnectedClientLogger.debug("Looks like the client has gone away: " + ex + 356 " (For a full stack trace, set the log category '" + DISCONNECTED_CLIENT_LOG_CATEGORY + 357 "' to TRACE level.)"); 358 } 359 } 360 else { 361 logger.debug("Terminating connection after failure to send message to client", ex); 362 } 363 } 364 365 private boolean indicatesDisconnectedClient(Throwable ex) { 366 String message = NestedExceptionUtils.getMostSpecificCause(ex).getMessage(); 367 message = (message != null ? message.toLowerCase() : ""); 368 String className = ex.getClass().getSimpleName(); 369 return (message.contains("broken pipe") || DISCONNECTED_CLIENT_EXCEPTIONS.contains(className)); 370 } 371 372 373 // Delegation methods 374 375 public void delegateConnectionEstablished() throws Exception { 376 this.state = State.OPEN; 377 this.handler.afterConnectionEstablished(this); 378 } 379 380 public void delegateMessages(String... messages) throws SockJsMessageDeliveryException { 381 for (int i = 0; i < messages.length; i++) { 382 try { 383 if (isClosed()) { 384 logUndeliveredMessages(i, messages); 385 return; 386 } 387 this.handler.handleMessage(this, new TextMessage(messages[i])); 388 } 389 catch (Exception ex) { 390 if (isClosed()) { 391 if (logger.isTraceEnabled()) { 392 logger.trace("Failed to handle message '" + messages[i] + "'", ex); 393 } 394 logUndeliveredMessages(i, messages); 395 return; 396 } 397 throw new SockJsMessageDeliveryException(this.id, getUndelivered(messages, i), ex); 398 } 399 } 400 } 401 402 private void logUndeliveredMessages(int index, String[] messages) { 403 List<String> undelivered = getUndelivered(messages, index); 404 if (logger.isTraceEnabled() && !undelivered.isEmpty()) { 405 logger.trace("Dropped inbound message(s) due to closed session: " + undelivered); 406 } 407 } 408 409 private static List<String> getUndelivered(String[] messages, int i) { 410 switch (messages.length - i) { 411 case 0: 412 return Collections.emptyList(); 413 case 1: 414 return (messages[i].trim().isEmpty() ? 415 Collections.emptyList() : Collections.singletonList(messages[i])); 416 default: 417 return Arrays.stream(Arrays.copyOfRange(messages, i, messages.length)) 418 .filter(message -> !message.trim().isEmpty()) 419 .collect(Collectors.toList()); 420 } 421 } 422 423 /** 424 * Invoked when the underlying connection is closed. 425 */ 426 public final void delegateConnectionClosed(CloseStatus status) throws Exception { 427 if (!isClosed()) { 428 try { 429 updateLastActiveTime(); 430 // Avoid cancelHeartbeat() and responseLock within server "close" callback 431 ScheduledFuture<?> future = this.heartbeatFuture; 432 if (future != null) { 433 this.heartbeatFuture = null; 434 future.cancel(false); 435 } 436 } 437 finally { 438 this.state = State.CLOSED; 439 this.handler.afterConnectionClosed(this, status); 440 } 441 } 442 } 443 444 /** 445 * Close due to error arising from SockJS transport handling. 446 */ 447 public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) { 448 if (logger.isDebugEnabled()) { 449 logger.debug("Closing due to transport error for " + this); 450 } 451 try { 452 delegateError(error); 453 } 454 catch (Throwable delegateException) { 455 // Ignore 456 logger.debug("Exception from error handling delegate", delegateException); 457 } 458 try { 459 close(closeStatus); 460 } 461 catch (Throwable closeException) { 462 logger.debug("Failure while closing " + this, closeException); 463 } 464 } 465 466 public void delegateError(Throwable ex) throws Exception { 467 this.handler.handleTransportError(this, ex); 468 } 469 470 471 // Self description 472 473 @Override 474 public String toString() { 475 return getClass().getSimpleName() + "[id=" + getId() + "]"; 476 } 477 478 479 private class HeartbeatTask implements Runnable { 480 481 private boolean expired; 482 483 @Override 484 public void run() { 485 synchronized (responseLock) { 486 if (!this.expired && !isClosed()) { 487 try { 488 sendHeartbeat(); 489 } 490 catch (Throwable ex) { 491 // Ignore: already handled in writeFrame... 492 } 493 finally { 494 this.expired = true; 495 } 496 } 497 } 498 } 499 500 void cancel() { 501 this.expired = true; 502 } 503 } 504 505}