001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ScheduledFuture;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.springframework.core.NestedExceptionUtils;
034import org.springframework.util.Assert;
035import org.springframework.web.socket.CloseStatus;
036import org.springframework.web.socket.TextMessage;
037import org.springframework.web.socket.WebSocketHandler;
038import org.springframework.web.socket.WebSocketMessage;
039import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
040import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
041import org.springframework.web.socket.sockjs.frame.SockJsFrame;
042import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
043import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
044import org.springframework.web.socket.sockjs.transport.SockJsSession;
045
046/**
047 * An abstract base class for SockJS sessions implementing {@link SockJsSession}.
048 *
049 * @author Rossen Stoyanchev
050 * @author Sam Brannen
051 * @since 4.0
052 */
053public abstract class AbstractSockJsSession implements SockJsSession {
054
055        private enum State {NEW, OPEN, CLOSED}
056
057
058        /**
059         * Log category to use on network IO exceptions after a client has gone away.
060         * <p>The Servlet API does not provide notifications when a client disconnects;
061         * see <a href="https://java.net/jira/browse/SERVLET_SPEC-44">SERVLET_SPEC-44</a>.
062         * Therefore network IO failures may occur simply because a client has gone away,
063         * and that can fill the logs with unnecessary stack traces.
064         * <p>We make a best effort to identify such network failures, on a per-server
065         * basis, and log them under a separate log category. A simple one-line message
066         * is logged at DEBUG level, while a full stack trace is shown at TRACE level.
067         * @see #disconnectedClientLogger
068         */
069        public static final String DISCONNECTED_CLIENT_LOG_CATEGORY =
070                        "org.springframework.web.socket.sockjs.DisconnectedClient";
071
072        /**
073         * Tomcat: ClientAbortException or EOFException
074         * Jetty: EofException
075         * WildFly, GlassFish: java.io.IOException "Broken pipe" (already covered)
076         * @see #indicatesDisconnectedClient(Throwable)
077         */
078        private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS =
079                        new HashSet<String>(Arrays.asList("ClientAbortException", "EOFException", "EofException"));
080
081
082        /**
083         * Separate logger to use on network IO failure after a client has gone away.
084         * @see #DISCONNECTED_CLIENT_LOG_CATEGORY
085         */
086        protected static final Log disconnectedClientLogger = LogFactory.getLog(DISCONNECTED_CLIENT_LOG_CATEGORY);
087
088        protected final Log logger = LogFactory.getLog(getClass());
089
090        protected final Object responseLock = new Object();
091
092        private final String id;
093
094        private final SockJsServiceConfig config;
095
096        private final WebSocketHandler handler;
097
098        private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
099
100        private volatile State state = State.NEW;
101
102        private final long timeCreated = System.currentTimeMillis();
103
104        private volatile long timeLastActive = this.timeCreated;
105
106        private ScheduledFuture<?> heartbeatFuture;
107
108        private HeartbeatTask heartbeatTask;
109
110        private volatile boolean heartbeatDisabled;
111
112
113        /**
114         * Create a new instance.
115         * @param id the session ID
116         * @param config SockJS service configuration options
117         * @param handler the recipient of SockJS messages
118         * @param attributes attributes from the HTTP handshake to associate with the WebSocket
119         * session; the provided attributes are copied, the original map is not used.
120         */
121        public AbstractSockJsSession(String id, SockJsServiceConfig config, WebSocketHandler handler,
122                        Map<String, Object> attributes) {
123
124                Assert.notNull(id, "Session id must not be null");
125                Assert.notNull(config, "SockJsServiceConfig must not be null");
126                Assert.notNull(handler, "WebSocketHandler must not be null");
127
128                this.id = id;
129                this.config = config;
130                this.handler = handler;
131
132                if (attributes != null) {
133                        this.attributes.putAll(attributes);
134                }
135        }
136
137
138        @Override
139        public String getId() {
140                return this.id;
141        }
142
143        protected SockJsMessageCodec getMessageCodec() {
144                return this.config.getMessageCodec();
145        }
146
147        public SockJsServiceConfig getSockJsServiceConfig() {
148                return this.config;
149        }
150
151        @Override
152        public Map<String, Object> getAttributes() {
153                return this.attributes;
154        }
155
156
157        // Message sending
158
159        public final void sendMessage(WebSocketMessage<?> message) throws IOException {
160                Assert.state(!isClosed(), "Cannot send a message when session is closed");
161                Assert.isInstanceOf(TextMessage.class, message, "SockJS supports text messages only");
162                sendMessageInternal(((TextMessage) message).getPayload());
163        }
164
165        protected abstract void sendMessageInternal(String message) throws IOException;
166
167
168        // Lifecycle related methods
169
170        public boolean isNew() {
171                return State.NEW.equals(this.state);
172        }
173
174        @Override
175        public boolean isOpen() {
176                return State.OPEN.equals(this.state);
177        }
178
179        public boolean isClosed() {
180                return State.CLOSED.equals(this.state);
181        }
182
183        /**
184         * Performs cleanup and notify the {@link WebSocketHandler}.
185         */
186        @Override
187        public final void close() throws IOException {
188                close(new CloseStatus(3000, "Go away!"));
189        }
190
191        /**
192         * Performs cleanup and notify the {@link WebSocketHandler}.
193         */
194        @Override
195        public final void close(CloseStatus status) throws IOException {
196                if (isOpen()) {
197                        if (logger.isDebugEnabled()) {
198                                logger.debug("Closing SockJS session " + getId() + " with " + status);
199                        }
200                        this.state = State.CLOSED;
201                        try {
202                                if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
203                                        try {
204                                                writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason()));
205                                        }
206                                        catch (Throwable ex) {
207                                                logger.debug("Failure while sending SockJS close frame", ex);
208                                        }
209                                }
210                                updateLastActiveTime();
211                                cancelHeartbeat();
212                                disconnect(status);
213                        }
214                        finally {
215                                try {
216                                        this.handler.afterConnectionClosed(this, status);
217                                }
218                                catch (Throwable ex) {
219                                        logger.debug("Error from WebSocketHandler.afterConnectionClosed in " + this, ex);
220                                }
221                        }
222                }
223        }
224
225        @Override
226        public long getTimeSinceLastActive() {
227                if (isNew()) {
228                        return (System.currentTimeMillis() - this.timeCreated);
229                }
230                else {
231                        return (isActive() ? 0 : System.currentTimeMillis() - this.timeLastActive);
232                }
233        }
234
235        /**
236         * Should be invoked whenever the session becomes inactive.
237         */
238        protected void updateLastActiveTime() {
239                this.timeLastActive = System.currentTimeMillis();
240        }
241
242        @Override
243        public void disableHeartbeat() {
244                this.heartbeatDisabled = true;
245                cancelHeartbeat();
246        }
247
248        public void sendHeartbeat() throws SockJsTransportFailureException {
249                synchronized (this.responseLock) {
250                        if (isActive() && !this.heartbeatDisabled) {
251                                writeFrame(SockJsFrame.heartbeatFrame());
252                                scheduleHeartbeat();
253                        }
254                }
255        }
256
257        protected void scheduleHeartbeat() {
258                if (this.heartbeatDisabled) {
259                        return;
260                }
261                synchronized (this.responseLock) {
262                        cancelHeartbeat();
263                        if (!isActive()) {
264                                return;
265                        }
266                        Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
267                        this.heartbeatTask = new HeartbeatTask();
268                        this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time);
269                        if (logger.isTraceEnabled()) {
270                                logger.trace("Scheduled heartbeat in session " + getId());
271                        }
272                }
273        }
274
275        protected void cancelHeartbeat() {
276                synchronized (this.responseLock) {
277                        if (this.heartbeatFuture != null) {
278                                if (logger.isTraceEnabled()) {
279                                        logger.trace("Cancelling heartbeat in session " + getId());
280                                }
281                                this.heartbeatFuture.cancel(false);
282                                this.heartbeatFuture = null;
283                        }
284                        if (this.heartbeatTask != null) {
285                                this.heartbeatTask.cancel();
286                                this.heartbeatTask = null;
287                        }
288                }
289        }
290
291        /**
292         * Polling and Streaming sessions periodically close the current HTTP request and
293         * wait for the next request to come through. During this "downtime" the session is
294         * still open but inactive and unable to send messages and therefore has to buffer
295         * them temporarily. A WebSocket session by contrast is stateful and remain active
296         * until closed.
297         */
298        public abstract boolean isActive();
299
300        /**
301         * Actually close the underlying WebSocket session or in the case of HTTP
302         * transports complete the underlying request.
303         */
304        protected abstract void disconnect(CloseStatus status) throws IOException;
305
306
307        // Frame writing
308
309        /**
310         * For internal use within a TransportHandler and the (TransportHandler-specific)
311         * session class.
312         */
313        protected void writeFrame(SockJsFrame frame) throws SockJsTransportFailureException {
314                if (logger.isTraceEnabled()) {
315                        logger.trace("Preparing to write " + frame);
316                }
317                try {
318                        writeFrameInternal(frame);
319                }
320                catch (Throwable ex) {
321                        logWriteFrameFailure(ex);
322                        try {
323                                // Force disconnect (so we won't try to send close frame)
324                                disconnect(CloseStatus.SERVER_ERROR);
325                        }
326                        catch (Throwable disconnectFailure) {
327                                // Ignore
328                        }
329                        try {
330                                close(CloseStatus.SERVER_ERROR);
331                        }
332                        catch (Throwable closeFailure) {
333                                // Nothing of consequence, already forced disconnect
334                        }
335                        throw new SockJsTransportFailureException("Failed to write " + frame, getId(), ex);
336                }
337        }
338
339        protected abstract void writeFrameInternal(SockJsFrame frame) throws IOException;
340
341        private void logWriteFrameFailure(Throwable ex) {
342                if (indicatesDisconnectedClient(ex)) {
343                        if (disconnectedClientLogger.isTraceEnabled()) {
344                                disconnectedClientLogger.trace("Looks like the client has gone away", ex);
345                        }
346                        else if (disconnectedClientLogger.isDebugEnabled()) {
347                                disconnectedClientLogger.debug("Looks like the client has gone away: " + ex +
348                                                " (For a full stack trace, set the log category '" + DISCONNECTED_CLIENT_LOG_CATEGORY +
349                                                "' to TRACE level.)");
350                        }
351                }
352                else {
353                        logger.debug("Terminating connection after failure to send message to client", ex);
354                }
355        }
356
357        private boolean indicatesDisconnectedClient(Throwable ex)  {
358                String message = NestedExceptionUtils.getMostSpecificCause(ex).getMessage();
359                message = (message != null ? message.toLowerCase() : "");
360                String className = ex.getClass().getSimpleName();
361                return (message.contains("broken pipe") || DISCONNECTED_CLIENT_EXCEPTIONS.contains(className));
362        }
363
364
365        // Delegation methods
366
367        public void delegateConnectionEstablished() throws Exception {
368                this.state = State.OPEN;
369                this.handler.afterConnectionEstablished(this);
370        }
371
372        public void delegateMessages(String... messages) throws SockJsMessageDeliveryException {
373                List<String> undelivered = new ArrayList<String>(Arrays.asList(messages));
374                for (String message : messages) {
375                        try {
376                                if (isClosed()) {
377                                        throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed");
378                                }
379                                else {
380                                        this.handler.handleMessage(this, new TextMessage(message));
381                                        undelivered.remove(0);
382                                }
383                        }
384                        catch (Throwable ex) {
385                                throw new SockJsMessageDeliveryException(this.id, undelivered, ex);
386                        }
387                }
388        }
389
390        /**
391         * Invoked when the underlying connection is closed.
392         */
393        public final void delegateConnectionClosed(CloseStatus status) throws Exception {
394                if (!isClosed()) {
395                        try {
396                                updateLastActiveTime();
397                                // Avoid cancelHeartbeat() and responseLock within server "close" callback
398                                ScheduledFuture<?> future = this.heartbeatFuture;
399                                if (future != null) {
400                                        this.heartbeatFuture = null;
401                                        future.cancel(false);
402                                }
403                        }
404                        finally {
405                                this.state = State.CLOSED;
406                                this.handler.afterConnectionClosed(this, status);
407                        }
408                }
409        }
410
411        /**
412         * Close due to error arising from SockJS transport handling.
413         */
414        public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) {
415                if (logger.isDebugEnabled()) {
416                        logger.debug("Closing due to transport error for " + this);
417                }
418                try {
419                        delegateError(error);
420                }
421                catch (Throwable delegateException) {
422                        // Ignore
423                        logger.debug("Exception from error handling delegate", delegateException);
424                }
425                try {
426                        close(closeStatus);
427                }
428                catch (Throwable closeException) {
429                        logger.debug("Failure while closing " + this, closeException);
430                }
431        }
432
433        public void delegateError(Throwable ex) throws Exception {
434                this.handler.handleTransportError(this, ex);
435        }
436
437
438        // Self description
439
440        @Override
441        public String toString() {
442                return getClass().getSimpleName() + "[id=" + getId() + "]";
443        }
444
445
446        private class HeartbeatTask implements Runnable {
447
448                private boolean expired;
449
450                @Override
451                public void run() {
452                        synchronized (responseLock) {
453                                if (!this.expired && !isClosed()) {
454                                        try {
455                                                sendHeartbeat();
456                                        }
457                                        catch (Throwable ex) {
458                                                // Ignore: already handled in writeFrame...
459                                        }
460                                        finally {
461                                                this.expired = true;
462                                        }
463                                }
464                        }
465                }
466
467                void cancel() {
468                        this.expired = true;
469                }
470        }
471
472}