001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ScheduledFuture;
029import java.util.stream.Collectors;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033
034import org.springframework.core.NestedExceptionUtils;
035import org.springframework.lang.Nullable;
036import org.springframework.util.Assert;
037import org.springframework.web.socket.CloseStatus;
038import org.springframework.web.socket.TextMessage;
039import org.springframework.web.socket.WebSocketHandler;
040import org.springframework.web.socket.WebSocketMessage;
041import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
042import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
043import org.springframework.web.socket.sockjs.frame.SockJsFrame;
044import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
045import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
046import org.springframework.web.socket.sockjs.transport.SockJsSession;
047
048/**
049 * An abstract base class for SockJS sessions implementing {@link SockJsSession}.
050 *
051 * @author Rossen Stoyanchev
052 * @author Sam Brannen
053 * @since 4.0
054 */
055public abstract class AbstractSockJsSession implements SockJsSession {
056
057        private enum State {NEW, OPEN, CLOSED}
058
059
060        /**
061         * Log category to use on network IO exceptions after a client has gone away.
062         * <p>Servlet containers don't expose a client disconnected callback; see
063         * <a href="https://github.com/eclipse-ee4j/servlet-api/issues/44">eclipse-ee4j/servlet-api#44</a>.
064         * Therefore network IO failures may occur simply because a client has gone away,
065         * and that can fill the logs with unnecessary stack traces.
066         * <p>We make a best effort to identify such network failures, on a per-server
067         * basis, and log them under a separate log category. A simple one-line message
068         * is logged at DEBUG level, while a full stack trace is shown at TRACE level.
069         * @see #disconnectedClientLogger
070         */
071        public static final String DISCONNECTED_CLIENT_LOG_CATEGORY =
072                        "org.springframework.web.socket.sockjs.DisconnectedClient";
073
074        /**
075         * Tomcat: ClientAbortException or EOFException
076         * Jetty: EofException
077         * WildFly, GlassFish: java.io.IOException "Broken pipe" (already covered)
078         * <p>TODO:
079         * This definition is currently duplicated between HttpWebHandlerAdapter
080         * and AbstractSockJsSession. It is a candidate for a common utility class.
081         * @see #indicatesDisconnectedClient(Throwable)
082         */
083        private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS =
084                        new HashSet<>(Arrays.asList("ClientAbortException", "EOFException", "EofException"));
085
086
087        /**
088         * Separate logger to use on network IO failure after a client has gone away.
089         * @see #DISCONNECTED_CLIENT_LOG_CATEGORY
090         */
091        protected static final Log disconnectedClientLogger = LogFactory.getLog(DISCONNECTED_CLIENT_LOG_CATEGORY);
092
093        protected final Log logger = LogFactory.getLog(getClass());
094
095        protected final Object responseLock = new Object();
096
097        private final String id;
098
099        private final SockJsServiceConfig config;
100
101        private final WebSocketHandler handler;
102
103        private final Map<String, Object> attributes = new ConcurrentHashMap<>();
104
105        private volatile State state = State.NEW;
106
107        private final long timeCreated = System.currentTimeMillis();
108
109        private volatile long timeLastActive = this.timeCreated;
110
111        @Nullable
112        private ScheduledFuture<?> heartbeatFuture;
113
114        @Nullable
115        private HeartbeatTask heartbeatTask;
116
117        private volatile boolean heartbeatDisabled;
118
119
120        /**
121         * Create a new instance.
122         * @param id the session ID
123         * @param config the SockJS service configuration options
124         * @param handler the recipient of SockJS messages
125         * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
126         * session; the provided attributes are copied, the original map is not used.
127         */
128        public AbstractSockJsSession(String id, SockJsServiceConfig config, WebSocketHandler handler,
129                        @Nullable Map<String, Object> attributes) {
130
131                Assert.notNull(id, "Session id must not be null");
132                Assert.notNull(config, "SockJsServiceConfig must not be null");
133                Assert.notNull(handler, "WebSocketHandler must not be null");
134
135                this.id = id;
136                this.config = config;
137                this.handler = handler;
138
139                if (attributes != null) {
140                        this.attributes.putAll(attributes);
141                }
142        }
143
144
145        @Override
146        public String getId() {
147                return this.id;
148        }
149
150        protected SockJsMessageCodec getMessageCodec() {
151                return this.config.getMessageCodec();
152        }
153
154        public SockJsServiceConfig getSockJsServiceConfig() {
155                return this.config;
156        }
157
158        @Override
159        public Map<String, Object> getAttributes() {
160                return this.attributes;
161        }
162
163
164        // Message sending
165
166        @Override
167        public final void sendMessage(WebSocketMessage<?> message) throws IOException {
168                Assert.state(!isClosed(), "Cannot send a message when session is closed");
169                Assert.isInstanceOf(TextMessage.class, message, "SockJS supports text messages only");
170                sendMessageInternal(((TextMessage) message).getPayload());
171        }
172
173        protected abstract void sendMessageInternal(String message) throws IOException;
174
175
176        // Lifecycle related methods
177
178        public boolean isNew() {
179                return State.NEW.equals(this.state);
180        }
181
182        @Override
183        public boolean isOpen() {
184                return State.OPEN.equals(this.state);
185        }
186
187        public boolean isClosed() {
188                return State.CLOSED.equals(this.state);
189        }
190
191        /**
192         * Performs cleanup and notify the {@link WebSocketHandler}.
193         */
194        @Override
195        public final void close() throws IOException {
196                close(new CloseStatus(3000, "Go away!"));
197        }
198
199        /**
200         * Performs cleanup and notify the {@link WebSocketHandler}.
201         */
202        @Override
203        public final void close(CloseStatus status) throws IOException {
204                if (isOpen()) {
205                        if (logger.isDebugEnabled()) {
206                                logger.debug("Closing SockJS session " + getId() + " with " + status);
207                        }
208                        this.state = State.CLOSED;
209                        try {
210                                if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
211                                        try {
212                                                writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason()));
213                                        }
214                                        catch (Throwable ex) {
215                                                logger.debug("Failure while sending SockJS close frame", ex);
216                                        }
217                                }
218                                updateLastActiveTime();
219                                cancelHeartbeat();
220                                disconnect(status);
221                        }
222                        finally {
223                                try {
224                                        this.handler.afterConnectionClosed(this, status);
225                                }
226                                catch (Throwable ex) {
227                                        logger.debug("Error from WebSocketHandler.afterConnectionClosed in " + this, ex);
228                                }
229                        }
230                }
231        }
232
233        @Override
234        public long getTimeSinceLastActive() {
235                if (isNew()) {
236                        return (System.currentTimeMillis() - this.timeCreated);
237                }
238                else {
239                        return (isActive() ? 0 : System.currentTimeMillis() - this.timeLastActive);
240                }
241        }
242
243        /**
244         * Should be invoked whenever the session becomes inactive.
245         */
246        protected void updateLastActiveTime() {
247                this.timeLastActive = System.currentTimeMillis();
248        }
249
250        @Override
251        public void disableHeartbeat() {
252                this.heartbeatDisabled = true;
253                cancelHeartbeat();
254        }
255
256        protected void sendHeartbeat() throws SockJsTransportFailureException {
257                synchronized (this.responseLock) {
258                        if (isActive() && !this.heartbeatDisabled) {
259                                writeFrame(SockJsFrame.heartbeatFrame());
260                                scheduleHeartbeat();
261                        }
262                }
263        }
264
265        protected void scheduleHeartbeat() {
266                if (this.heartbeatDisabled) {
267                        return;
268                }
269                synchronized (this.responseLock) {
270                        cancelHeartbeat();
271                        if (!isActive()) {
272                                return;
273                        }
274                        Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
275                        this.heartbeatTask = new HeartbeatTask();
276                        this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time);
277                        if (logger.isTraceEnabled()) {
278                                logger.trace("Scheduled heartbeat in session " + getId());
279                        }
280                }
281        }
282
283        protected void cancelHeartbeat() {
284                synchronized (this.responseLock) {
285                        if (this.heartbeatFuture != null) {
286                                if (logger.isTraceEnabled()) {
287                                        logger.trace("Cancelling heartbeat in session " + getId());
288                                }
289                                this.heartbeatFuture.cancel(false);
290                                this.heartbeatFuture = null;
291                        }
292                        if (this.heartbeatTask != null) {
293                                this.heartbeatTask.cancel();
294                                this.heartbeatTask = null;
295                        }
296                }
297        }
298
299        /**
300         * Polling and Streaming sessions periodically close the current HTTP request and
301         * wait for the next request to come through. During this "downtime" the session is
302         * still open but inactive and unable to send messages and therefore has to buffer
303         * them temporarily. A WebSocket session by contrast is stateful and remain active
304         * until closed.
305         */
306        public abstract boolean isActive();
307
308        /**
309         * Actually close the underlying WebSocket session or in the case of HTTP
310         * transports complete the underlying request.
311         */
312        protected abstract void disconnect(CloseStatus status) throws IOException;
313
314
315        // Frame writing
316
317        /**
318         * For internal use within a TransportHandler and the (TransportHandler-specific)
319         * session class.
320         */
321        protected void writeFrame(SockJsFrame frame) throws SockJsTransportFailureException {
322                if (logger.isTraceEnabled()) {
323                        logger.trace("Preparing to write " + frame);
324                }
325                try {
326                        writeFrameInternal(frame);
327                }
328                catch (Exception ex) {
329                        logWriteFrameFailure(ex);
330                        try {
331                                // Force disconnect (so we won't try to send close frame)
332                                disconnect(CloseStatus.SERVER_ERROR);
333                        }
334                        catch (Throwable disconnectFailure) {
335                                // Ignore
336                        }
337                        try {
338                                close(CloseStatus.SERVER_ERROR);
339                        }
340                        catch (Throwable closeFailure) {
341                                // Nothing of consequence, already forced disconnect
342                        }
343                        throw new SockJsTransportFailureException("Failed to write " + frame, getId(), ex);
344                }
345        }
346
347        protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException;
348
349        private void logWriteFrameFailure(Throwable ex) {
350                if (indicatesDisconnectedClient(ex)) {
351                        if (disconnectedClientLogger.isTraceEnabled()) {
352                                disconnectedClientLogger.trace("Looks like the client has gone away", ex);
353                        }
354                        else if (disconnectedClientLogger.isDebugEnabled()) {
355                                disconnectedClientLogger.debug("Looks like the client has gone away: " + ex +
356                                                " (For a full stack trace, set the log category '" + DISCONNECTED_CLIENT_LOG_CATEGORY +
357                                                "' to TRACE level.)");
358                        }
359                }
360                else {
361                        logger.debug("Terminating connection after failure to send message to client", ex);
362                }
363        }
364
365        private boolean indicatesDisconnectedClient(Throwable ex)  {
366                String message = NestedExceptionUtils.getMostSpecificCause(ex).getMessage();
367                message = (message != null ? message.toLowerCase() : "");
368                String className = ex.getClass().getSimpleName();
369                return (message.contains("broken pipe") || DISCONNECTED_CLIENT_EXCEPTIONS.contains(className));
370        }
371
372
373        // Delegation methods
374
375        public void delegateConnectionEstablished() throws Exception {
376                this.state = State.OPEN;
377                this.handler.afterConnectionEstablished(this);
378        }
379
380        public void delegateMessages(String... messages) throws SockJsMessageDeliveryException {
381                for (int i = 0; i < messages.length; i++) {
382                        try {
383                                if (isClosed()) {
384                                        logUndeliveredMessages(i, messages);
385                                        return;
386                                }
387                                this.handler.handleMessage(this, new TextMessage(messages[i]));
388                        }
389                        catch (Exception ex) {
390                                if (isClosed()) {
391                                        if (logger.isTraceEnabled()) {
392                                                logger.trace("Failed to handle message '" + messages[i] + "'", ex);
393                                        }
394                                        logUndeliveredMessages(i, messages);
395                                        return;
396                                }
397                                throw new SockJsMessageDeliveryException(this.id, getUndelivered(messages, i), ex);
398                        }
399                }
400        }
401
402        private void logUndeliveredMessages(int index, String[] messages) {
403                List<String> undelivered = getUndelivered(messages, index);
404                if (logger.isTraceEnabled() && !undelivered.isEmpty()) {
405                        logger.trace("Dropped inbound message(s) due to closed session: " + undelivered);
406                }
407        }
408
409        private static List<String> getUndelivered(String[] messages, int i) {
410                switch (messages.length - i) {
411                        case 0:
412                                return Collections.emptyList();
413                        case 1:
414                                return (messages[i].trim().isEmpty() ?
415                                                Collections.emptyList() : Collections.singletonList(messages[i]));
416                        default:
417                                return Arrays.stream(Arrays.copyOfRange(messages, i, messages.length))
418                                                .filter(message -> !message.trim().isEmpty())
419                                                .collect(Collectors.toList());
420                }
421        }
422
423        /**
424         * Invoked when the underlying connection is closed.
425         */
426        public final void delegateConnectionClosed(CloseStatus status) throws Exception {
427                if (!isClosed()) {
428                        try {
429                                updateLastActiveTime();
430                                // Avoid cancelHeartbeat() and responseLock within server "close" callback
431                                ScheduledFuture<?> future = this.heartbeatFuture;
432                                if (future != null) {
433                                        this.heartbeatFuture = null;
434                                        future.cancel(false);
435                                }
436                        }
437                        finally {
438                                this.state = State.CLOSED;
439                                this.handler.afterConnectionClosed(this, status);
440                        }
441                }
442        }
443
444        /**
445         * Close due to error arising from SockJS transport handling.
446         */
447        public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) {
448                if (logger.isDebugEnabled()) {
449                        logger.debug("Closing due to transport error for " + this);
450                }
451                try {
452                        delegateError(error);
453                }
454                catch (Throwable delegateException) {
455                        // Ignore
456                        logger.debug("Exception from error handling delegate", delegateException);
457                }
458                try {
459                        close(closeStatus);
460                }
461                catch (Throwable closeException) {
462                        logger.debug("Failure while closing " + this, closeException);
463                }
464        }
465
466        public void delegateError(Throwable ex) throws Exception {
467                this.handler.handleTransportError(this, ex);
468        }
469
470
471        // Self description
472
473        @Override
474        public String toString() {
475                return getClass().getSimpleName() + "[id=" + getId() + "]";
476        }
477
478
479        private class HeartbeatTask implements Runnable {
480
481                private boolean expired;
482
483                @Override
484                public void run() {
485                        synchronized (responseLock) {
486                                if (!this.expired && !isClosed()) {
487                                        try {
488                                                sendHeartbeat();
489                                        }
490                                        catch (Throwable ex) {
491                                                // Ignore: already handled in writeFrame...
492                                        }
493                                        finally {
494                                                this.expired = true;
495                                        }
496                                }
497                        }
498                }
499
500                void cancel() {
501                        this.expired = true;
502                }
503        }
504
505}