001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.security.Principal;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Queue;
027import java.util.concurrent.LinkedBlockingQueue;
028
029import javax.servlet.ServletRequest;
030
031import org.springframework.http.HttpHeaders;
032import org.springframework.http.server.ServerHttpAsyncRequestControl;
033import org.springframework.http.server.ServerHttpRequest;
034import org.springframework.http.server.ServerHttpResponse;
035import org.springframework.http.server.ServletServerHttpRequest;
036import org.springframework.lang.Nullable;
037import org.springframework.util.Assert;
038import org.springframework.web.filter.ShallowEtagHeaderFilter;
039import org.springframework.web.socket.CloseStatus;
040import org.springframework.web.socket.WebSocketExtension;
041import org.springframework.web.socket.WebSocketHandler;
042import org.springframework.web.socket.sockjs.SockJsException;
043import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
044import org.springframework.web.socket.sockjs.frame.SockJsFrame;
045import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
046import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
047
048/**
049 * An abstract base class for use with HTTP transport SockJS sessions.
050 *
051 * @author Rossen Stoyanchev
052 * @since 4.0
053 */
054public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
055
056        private final Queue<String> messageCache;
057
058        @Nullable
059        private volatile URI uri;
060
061        @Nullable
062        private volatile HttpHeaders handshakeHeaders;
063
064        @Nullable
065        private volatile Principal principal;
066
067        @Nullable
068        private volatile InetSocketAddress localAddress;
069
070        @Nullable
071        private volatile InetSocketAddress remoteAddress;
072
073        @Nullable
074        private volatile String acceptedProtocol;
075
076        @Nullable
077        private volatile ServerHttpResponse response;
078
079        @Nullable
080        private volatile SockJsFrameFormat frameFormat;
081
082        @Nullable
083        private volatile ServerHttpAsyncRequestControl asyncRequestControl;
084
085        private boolean readyToSend;
086
087
088        public AbstractHttpSockJsSession(String id, SockJsServiceConfig config,
089                        WebSocketHandler wsHandler, Map<String, Object> attributes) {
090
091                super(id, config, wsHandler, attributes);
092                this.messageCache = new LinkedBlockingQueue<>(config.getHttpMessageCacheSize());
093        }
094
095
096        @Override
097        public URI getUri() {
098                URI uri = this.uri;
099                Assert.state(uri != null, "No initial request yet");
100                return uri;
101        }
102
103        @Override
104        public HttpHeaders getHandshakeHeaders() {
105                HttpHeaders headers = this.handshakeHeaders;
106                Assert.state(headers != null, "No initial request yet");
107                return headers;
108        }
109
110        @Override
111        @Nullable
112        public Principal getPrincipal() {
113                return this.principal;
114        }
115
116        @Override
117        @Nullable
118        public InetSocketAddress getLocalAddress() {
119                return this.localAddress;
120        }
121
122        @Override
123        @Nullable
124        public InetSocketAddress getRemoteAddress() {
125                return this.remoteAddress;
126        }
127
128        /**
129         * Unlike WebSocket where sub-protocol negotiation is part of the initial
130         * handshake, in HTTP transports the same negotiation must be emulated and
131         * the selected protocol set through this setter.
132         * @param protocol the sub-protocol to set
133         */
134        public void setAcceptedProtocol(@Nullable String protocol) {
135                this.acceptedProtocol = protocol;
136        }
137
138        /**
139         * Return the selected sub-protocol to use.
140         */
141        @Override
142        @Nullable
143        public String getAcceptedProtocol() {
144                return this.acceptedProtocol;
145        }
146
147        /**
148         * Return the SockJS buffer for messages stored transparently between polling
149         * requests. If the polling request takes longer than 5 seconds, the session
150         * is closed.
151         * @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService
152         */
153        protected Queue<String> getMessageCache() {
154                return this.messageCache;
155        }
156
157
158        @Override
159        public boolean isActive() {
160                ServerHttpAsyncRequestControl control = this.asyncRequestControl;
161                return (control != null && !control.isCompleted());
162        }
163
164        @Override
165        public void setTextMessageSizeLimit(int messageSizeLimit) {
166                // ignore
167        }
168
169        @Override
170        public int getTextMessageSizeLimit() {
171                return -1;
172        }
173
174        @Override
175        public void setBinaryMessageSizeLimit(int messageSizeLimit) {
176                // ignore
177        }
178
179        @Override
180        public int getBinaryMessageSizeLimit() {
181                return -1;
182        }
183
184        @Override
185        public List<WebSocketExtension> getExtensions() {
186                return Collections.emptyList();
187        }
188
189
190        /**
191         * Handle the first request for receiving messages on a SockJS HTTP transport
192         * based session.
193         * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
194         * after writing the open frame. Streaming-based transports ("xhr_streaming",
195         * "eventsource", and "htmlfile") leave the response open longer for further
196         * streaming of message frames but will also close it eventually after some
197         * amount of data has been sent.
198         * @param request the current request
199         * @param response the current response
200         * @param frameFormat the transport-specific SocksJS frame format to use
201         */
202        public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
203                        SockJsFrameFormat frameFormat) throws SockJsException {
204
205                this.uri = request.getURI();
206                this.handshakeHeaders = request.getHeaders();
207                this.principal = request.getPrincipal();
208                try {
209                        this.localAddress = request.getLocalAddress();
210                }
211                catch (Exception ex) {
212                        // Ignore
213                }
214                try {
215                        this.remoteAddress = request.getRemoteAddress();
216                }
217                catch (Exception ex) {
218                        // Ignore
219                }
220
221                synchronized (this.responseLock) {
222                        try {
223                                this.response = response;
224                                this.frameFormat = frameFormat;
225                                ServerHttpAsyncRequestControl control = request.getAsyncRequestControl(response);
226                                this.asyncRequestControl = control;
227                                control.start(-1);
228                                disableShallowEtagHeaderFilter(request);
229                                // Let "our" handler know before sending the open frame to the remote handler
230                                delegateConnectionEstablished();
231                                handleRequestInternal(request, response, true);
232                                // Request might have been reset (e.g. polling sessions do after writing)
233                                this.readyToSend = isActive();
234                        }
235                        catch (Throwable ex) {
236                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
237                                throw new SockJsTransportFailureException("Failed to open session", getId(), ex);
238                        }
239                }
240        }
241
242        /**
243         * Handle all requests, except the first one, to receive messages on a SockJS
244         * HTTP transport based session.
245         * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
246         * after writing any buffered message frames (or the next one). Streaming-based
247         * transports ("xhr_streaming", "eventsource", and "htmlfile") leave the
248         * response open longer for further streaming of message frames but will also
249         * close it eventually after some amount of data has been sent.
250         * @param request the current request
251         * @param response the current response
252         * @param frameFormat the transport-specific SocksJS frame format to use
253         */
254        public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpResponse response,
255                        SockJsFrameFormat frameFormat) throws SockJsException {
256
257                synchronized (this.responseLock) {
258                        try {
259                                if (isClosed()) {
260                                        response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes());
261                                        return;
262                                }
263                                this.response = response;
264                                this.frameFormat = frameFormat;
265                                ServerHttpAsyncRequestControl control = request.getAsyncRequestControl(response);
266                                this.asyncRequestControl = control;
267                                control.start(-1);
268                                disableShallowEtagHeaderFilter(request);
269                                handleRequestInternal(request, response, false);
270                                this.readyToSend = isActive();
271                        }
272                        catch (Throwable ex) {
273                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
274                                throw new SockJsTransportFailureException("Failed to handle SockJS receive request", getId(), ex);
275                        }
276                }
277        }
278
279        private void disableShallowEtagHeaderFilter(ServerHttpRequest request) {
280                if (request instanceof ServletServerHttpRequest) {
281                        ServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
282                        ShallowEtagHeaderFilter.disableContentCaching(servletRequest);
283                }
284        }
285
286        /**
287         * Invoked when a SockJS transport request is received.
288         * @param request the current request
289         * @param response the current response
290         * @param initialRequest whether it is the first request for the session
291         */
292        protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
293                        boolean initialRequest) throws IOException;
294
295        @Override
296        protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
297                synchronized (this.responseLock) {
298                        this.messageCache.add(message);
299                        if (logger.isTraceEnabled()) {
300                                logger.trace(this.messageCache.size() + " message(s) to flush in session " + getId());
301                        }
302                        if (isActive() && this.readyToSend) {
303                                if (logger.isTraceEnabled()) {
304                                        logger.trace("Session is active, ready to flush.");
305                                }
306                                cancelHeartbeat();
307                                flushCache();
308                        }
309                        else {
310                                if (logger.isTraceEnabled()) {
311                                        logger.trace("Session is not active, not ready to flush.");
312                                }
313                        }
314                }
315        }
316
317        /**
318         * Called when the connection is active and ready to write to the response.
319         * Subclasses should only call this method from a method where the
320         * "responseLock" is acquired.
321         */
322        protected abstract void flushCache() throws SockJsTransportFailureException;
323
324
325        @Override
326        protected void disconnect(CloseStatus status) {
327                resetRequest();
328        }
329
330        protected void resetRequest() {
331                synchronized (this.responseLock) {
332                        ServerHttpAsyncRequestControl control = this.asyncRequestControl;
333                        this.asyncRequestControl = null;
334                        this.readyToSend = false;
335                        this.response = null;
336                        updateLastActiveTime();
337                        if (control != null && !control.isCompleted() && control.isStarted()) {
338                                try {
339                                        control.complete();
340                                }
341                                catch (Throwable ex) {
342                                        // Could be part of normal workflow (e.g. browser tab closed)
343                                        logger.debug("Failed to complete request: " + ex.getMessage());
344                                }
345                        }
346                }
347        }
348
349        @Override
350        protected void writeFrameInternal(SockJsFrame frame) throws IOException {
351                if (isActive()) {
352                        SockJsFrameFormat frameFormat = this.frameFormat;
353                        ServerHttpResponse response = this.response;
354                        if (frameFormat != null && response != null) {
355                                String formattedFrame = frameFormat.format(frame);
356                                if (logger.isTraceEnabled()) {
357                                        logger.trace("Writing to HTTP response: " + formattedFrame);
358                                }
359                                response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET));
360                                response.flush();
361                        }
362                }
363        }
364
365}