001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.transport.session;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.security.Principal;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Queue;
027import java.util.concurrent.LinkedBlockingQueue;
028
029import javax.servlet.ServletRequest;
030
031import org.springframework.http.HttpHeaders;
032import org.springframework.http.server.ServerHttpAsyncRequestControl;
033import org.springframework.http.server.ServerHttpRequest;
034import org.springframework.http.server.ServerHttpResponse;
035import org.springframework.http.server.ServletServerHttpRequest;
036import org.springframework.web.filter.ShallowEtagHeaderFilter;
037import org.springframework.web.socket.CloseStatus;
038import org.springframework.web.socket.WebSocketExtension;
039import org.springframework.web.socket.WebSocketHandler;
040import org.springframework.web.socket.sockjs.SockJsException;
041import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
042import org.springframework.web.socket.sockjs.frame.SockJsFrame;
043import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
044import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
045
046/**
047 * An abstract base class for use with HTTP transport SockJS sessions.
048 *
049 * @author Rossen Stoyanchev
050 * @since 4.0
051 */
052public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
053
054        private final Queue<String> messageCache;
055
056        private volatile URI uri;
057
058        private volatile HttpHeaders handshakeHeaders;
059
060        private volatile Principal principal;
061
062        private volatile InetSocketAddress localAddress;
063
064        private volatile InetSocketAddress remoteAddress;
065
066        private volatile String acceptedProtocol;
067
068        private volatile ServerHttpResponse response;
069
070        private volatile SockJsFrameFormat frameFormat;
071
072        private volatile ServerHttpAsyncRequestControl asyncRequestControl;
073
074        private boolean readyToSend;
075
076
077        public AbstractHttpSockJsSession(String id, SockJsServiceConfig config,
078                        WebSocketHandler wsHandler, Map<String, Object> attributes) {
079
080                super(id, config, wsHandler, attributes);
081                this.messageCache = new LinkedBlockingQueue<String>(config.getHttpMessageCacheSize());
082        }
083
084
085        @Override
086        public URI getUri() {
087                return this.uri;
088        }
089
090        @Override
091        public HttpHeaders getHandshakeHeaders() {
092                return this.handshakeHeaders;
093        }
094
095        @Override
096        public Principal getPrincipal() {
097                return this.principal;
098        }
099
100        @Override
101        public InetSocketAddress getLocalAddress() {
102                return this.localAddress;
103        }
104
105        @Override
106        public InetSocketAddress getRemoteAddress() {
107                return this.remoteAddress;
108        }
109
110        /**
111         * Unlike WebSocket where sub-protocol negotiation is part of the initial
112         * handshake, in HTTP transports the same negotiation must be emulated and
113         * the selected protocol set through this setter.
114         * @param protocol the sub-protocol to set
115         */
116        public void setAcceptedProtocol(String protocol) {
117                this.acceptedProtocol = protocol;
118        }
119
120        /**
121         * Return the selected sub-protocol to use.
122         */
123        public String getAcceptedProtocol() {
124                return this.acceptedProtocol;
125        }
126
127        /**
128         * Return the SockJS buffer for messages stored transparently between polling
129         * requests. If the polling request takes longer than 5 seconds, the session
130         * is closed.
131         * @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService
132         */
133        protected Queue<String> getMessageCache() {
134                return this.messageCache;
135        }
136
137        @Override
138        public boolean isActive() {
139                ServerHttpAsyncRequestControl control = this.asyncRequestControl;
140                return (control != null && !control.isCompleted());
141        }
142
143        @Override
144        public void setTextMessageSizeLimit(int messageSizeLimit) {
145                // ignore
146        }
147
148        @Override
149        public int getTextMessageSizeLimit() {
150                return -1;
151        }
152
153        @Override
154        public void setBinaryMessageSizeLimit(int messageSizeLimit) {
155                // ignore
156        }
157
158        @Override
159        public int getBinaryMessageSizeLimit() {
160                return -1;
161        }
162
163        @Override
164        public List<WebSocketExtension> getExtensions() {
165                return Collections.emptyList();
166        }
167
168        /**
169         * @deprecated as of 4.2, since this method is no longer used.
170         */
171        @Deprecated
172        protected abstract boolean isStreaming();
173
174
175        /**
176         * Handle the first request for receiving messages on a SockJS HTTP transport
177         * based session.
178         * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
179         * after writing the open frame. Streaming-based transports ("xhr_streaming",
180         * "eventsource", and "htmlfile") leave the response open longer for further
181         * streaming of message frames but will also close it eventually after some
182         * amount of data has been sent.
183         * @param request the current request
184         * @param response the current response
185         * @param frameFormat the transport-specific SocksJS frame format to use
186         */
187        public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
188                        SockJsFrameFormat frameFormat) throws SockJsException {
189
190                this.uri = request.getURI();
191                this.handshakeHeaders = request.getHeaders();
192                this.principal = request.getPrincipal();
193                try {
194                        this.localAddress = request.getLocalAddress();
195                }
196                catch (Exception ex) {
197                        // Ignore
198                }
199                try {
200                        this.remoteAddress = request.getRemoteAddress();
201                }
202                catch (Exception ex) {
203                        // Ignore
204                }
205
206                synchronized (this.responseLock) {
207                        try {
208                                this.response = response;
209                                this.frameFormat = frameFormat;
210                                this.asyncRequestControl = request.getAsyncRequestControl(response);
211                                this.asyncRequestControl.start(-1);
212                                disableShallowEtagHeaderFilter(request);
213                                // Let "our" handler know before sending the open frame to the remote handler
214                                delegateConnectionEstablished();
215                                handleRequestInternal(request, response, true);
216                                // Request might have been reset (e.g. polling sessions do after writing)
217                                this.readyToSend = isActive();
218                        }
219                        catch (Throwable ex) {
220                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
221                                throw new SockJsTransportFailureException("Failed to open session", getId(), ex);
222                        }
223                }
224        }
225
226        /**
227         * Handle all requests, except the first one, to receive messages on a SockJS
228         * HTTP transport based session.
229         * <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
230         * after writing any buffered message frames (or the next one). Streaming-based
231         * transports ("xhr_streaming", "eventsource", and "htmlfile") leave the
232         * response open longer for further streaming of message frames but will also
233         * close it eventually after some amount of data has been sent.
234         * @param request the current request
235         * @param response the current response
236         * @param frameFormat the transport-specific SocksJS frame format to use
237         */
238        public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpResponse response,
239                        SockJsFrameFormat frameFormat) throws SockJsException {
240
241                synchronized (this.responseLock) {
242                        try {
243                                if (isClosed()) {
244                                        response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes());
245                                        return;
246                                }
247                                this.response = response;
248                                this.frameFormat = frameFormat;
249                                this.asyncRequestControl = request.getAsyncRequestControl(response);
250                                this.asyncRequestControl.start(-1);
251                                disableShallowEtagHeaderFilter(request);
252                                handleRequestInternal(request, response, false);
253                                this.readyToSend = isActive();
254                        }
255                        catch (Throwable ex) {
256                                tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
257                                throw new SockJsTransportFailureException("Failed to handle SockJS receive request", getId(), ex);
258                        }
259                }
260        }
261
262        private void disableShallowEtagHeaderFilter(ServerHttpRequest request) {
263                if (request instanceof ServletServerHttpRequest) {
264                        ServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
265                        ShallowEtagHeaderFilter.disableContentCaching(servletRequest);
266                }
267        }
268
269        /**
270         * Invoked when a SockJS transport request is received.
271         * @param request the current request
272         * @param response the current response
273         * @param initialRequest whether it is the first request for the session
274         */
275        protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
276                        boolean initialRequest) throws IOException;
277
278        @Override
279        protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
280                synchronized (this.responseLock) {
281                        this.messageCache.add(message);
282                        if (logger.isTraceEnabled()) {
283                                logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId());
284                        }
285                        if (isActive() && this.readyToSend) {
286                                if (logger.isTraceEnabled()) {
287                                        logger.trace("Session is active, ready to flush.");
288                                }
289                                cancelHeartbeat();
290                                flushCache();
291                        }
292                        else {
293                                if (logger.isTraceEnabled()) {
294                                        logger.trace("Session is not active, not ready to flush.");
295                                }
296                        }
297                }
298        }
299
300        /**
301         * Called when the connection is active and ready to write to the response.
302         * Subclasses should only call this method from a method where the
303         * "responseLock" is acquired.
304         */
305        protected abstract void flushCache() throws SockJsTransportFailureException;
306
307
308        /**
309         * @deprecated as of 4.2 this method is deprecated since the prelude is written
310         * in {@link #handleRequestInternal} of the StreamingSockJsSession subclass.
311         */
312        @Deprecated
313        protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
314        }
315
316        @Override
317        protected void disconnect(CloseStatus status) {
318                resetRequest();
319        }
320
321        protected void resetRequest() {
322                synchronized (this.responseLock) {
323                        ServerHttpAsyncRequestControl control = this.asyncRequestControl;
324                        this.asyncRequestControl = null;
325                        this.readyToSend = false;
326                        this.response = null;
327                        updateLastActiveTime();
328                        if (control != null && !control.isCompleted()) {
329                                if (control.isStarted()) {
330                                        try {
331                                                control.complete();
332                                        }
333                                        catch (Throwable ex) {
334                                                // Could be part of normal workflow (e.g. browser tab closed)
335                                                logger.debug("Failed to complete request: " + ex.getMessage());
336                                        }
337                                }
338                        }
339                }
340        }
341
342        @Override
343        protected void writeFrameInternal(SockJsFrame frame) throws IOException {
344                if (isActive()) {
345                        String formattedFrame = this.frameFormat.format(frame);
346                        if (logger.isTraceEnabled()) {
347                                logger.trace("Writing to HTTP response: " + formattedFrame);
348                        }
349                        this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET));
350                        this.response.flush();
351                }
352        }
353
354}