001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.client;
018
019import java.io.ByteArrayOutputStream;
020import java.net.URI;
021import java.nio.ByteBuffer;
022import java.util.Enumeration;
023import java.util.List;
024import java.util.Map;
025
026import org.eclipse.jetty.client.HttpClient;
027import org.eclipse.jetty.client.api.ContentResponse;
028import org.eclipse.jetty.client.api.Request;
029import org.eclipse.jetty.client.api.Response;
030import org.eclipse.jetty.client.util.StringContentProvider;
031import org.eclipse.jetty.http.HttpFields;
032import org.eclipse.jetty.http.HttpMethod;
033
034import org.springframework.context.Lifecycle;
035import org.springframework.http.HttpHeaders;
036import org.springframework.http.HttpStatus;
037import org.springframework.http.ResponseEntity;
038import org.springframework.util.Assert;
039import org.springframework.util.concurrent.SettableListenableFuture;
040import org.springframework.web.client.HttpServerErrorException;
041import org.springframework.web.socket.CloseStatus;
042import org.springframework.web.socket.TextMessage;
043import org.springframework.web.socket.WebSocketHandler;
044import org.springframework.web.socket.WebSocketSession;
045import org.springframework.web.socket.sockjs.SockJsException;
046import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
047import org.springframework.web.socket.sockjs.frame.SockJsFrame;
048
049/**
050 * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}.
051 *
052 * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient}
053 * properties must be set to allow a larger than usual number of connections and
054 * threads. For example:
055 *
056 * <pre class="code">
057 * HttpClient httpClient = new HttpClient();
058 * httpClient.setMaxConnectionsPerDestination(1000);
059 * httpClient.setExecutor(new QueuedThreadPool(500));
060 * </pre>
061 *
062 * @author Rossen Stoyanchev
063 * @since 4.1
064 */
065public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle {
066
067        private final HttpClient httpClient;
068
069
070        public JettyXhrTransport(HttpClient httpClient) {
071                Assert.notNull(httpClient, "'httpClient' is required");
072                this.httpClient = httpClient;
073        }
074
075
076        public HttpClient getHttpClient() {
077                return this.httpClient;
078        }
079
080        @Override
081        public void start() {
082                try {
083                        if (!this.httpClient.isRunning()) {
084                                this.httpClient.start();
085                        }
086                }
087                catch (Exception ex) {
088                        throw new SockJsException("Failed to start JettyXhrTransport", ex);
089                }
090        }
091
092        @Override
093        public void stop() {
094                try {
095                        if (this.httpClient.isRunning()) {
096                                this.httpClient.stop();
097                        }
098                }
099                catch (Exception ex) {
100                        throw new SockJsException("Failed to stop JettyXhrTransport", ex);
101                }
102        }
103
104        @Override
105        public boolean isRunning() {
106                return this.httpClient.isRunning();
107        }
108
109
110        @Override
111        protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
112                        URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
113                        SettableListenableFuture<WebSocketSession> connectFuture) {
114
115                HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
116                SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
117                executeReceiveRequest(url, handshakeHeaders, listener);
118        }
119
120        private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
121                if (logger.isTraceEnabled()) {
122                        logger.trace("Starting XHR receive request, url=" + url);
123                }
124                Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
125                addHttpHeaders(httpRequest, headers);
126                httpRequest.send(listener);
127        }
128
129        @Override
130        protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) {
131                return executeRequest(infoUrl, HttpMethod.GET, headers, null);
132        }
133
134        @Override
135        public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
136                return executeRequest(url, HttpMethod.POST, headers, message.getPayload());
137        }
138
139        protected ResponseEntity<String> executeRequest(URI url, HttpMethod method, HttpHeaders headers, String body) {
140                Request httpRequest = this.httpClient.newRequest(url).method(method);
141                addHttpHeaders(httpRequest, headers);
142                if (body != null) {
143                        httpRequest.content(new StringContentProvider(body));
144                }
145                ContentResponse response;
146                try {
147                        response = httpRequest.send();
148                }
149                catch (Exception ex) {
150                        throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
151                }
152                HttpStatus status = HttpStatus.valueOf(response.getStatus());
153                HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());
154                return (response.getContent() != null ?
155                        new ResponseEntity<String>(response.getContentAsString(), responseHeaders, status) :
156                        new ResponseEntity<String>(responseHeaders, status));
157        }
158
159
160        private static void addHttpHeaders(Request request, HttpHeaders headers) {
161                for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
162                        for (String value : entry.getValue()) {
163                                request.header(entry.getKey(), value);
164                        }
165                }
166        }
167
168        private static HttpHeaders toHttpHeaders(HttpFields httpFields) {
169                HttpHeaders responseHeaders = new HttpHeaders();
170                Enumeration<String> names = httpFields.getFieldNames();
171                while (names.hasMoreElements()) {
172                        String name = names.nextElement();
173                        Enumeration<String> values = httpFields.getValues(name);
174                        while (values.hasMoreElements()) {
175                                String value = values.nextElement();
176                                responseHeaders.add(name, value);
177                        }
178                }
179                return responseHeaders;
180        }
181
182
183        /**
184         * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response
185         * Listener} that splits the body of the response into SockJS frames and
186         * delegates them to the {@link XhrClientSockJsSession}.
187         */
188        private class SockJsResponseListener extends Response.Listener.Adapter {
189
190                private final URI transportUrl;
191
192                private final HttpHeaders receiveHeaders;
193
194                private final XhrClientSockJsSession sockJsSession;
195
196                private final SettableListenableFuture<WebSocketSession> connectFuture;
197
198                private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
199
200                public SockJsResponseListener(URI url, HttpHeaders headers,     XhrClientSockJsSession sockJsSession,
201                                SettableListenableFuture<WebSocketSession> connectFuture) {
202
203                        this.transportUrl = url;
204                        this.receiveHeaders = headers;
205                        this.connectFuture = connectFuture;
206                        this.sockJsSession = sockJsSession;
207                }
208
209                @Override
210                public void onBegin(Response response) {
211                        if (response.getStatus() != 200) {
212                                HttpStatus status = HttpStatus.valueOf(response.getStatus());
213                                response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status"));
214                        }
215                }
216
217                @Override
218                public void onHeaders(Response response) {
219                        if (logger.isTraceEnabled()) {
220                                // Convert to HttpHeaders to avoid "\n"
221                                logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
222                        }
223                }
224
225                @Override
226                public void onContent(Response response, ByteBuffer buffer) {
227                        while (true) {
228                                if (this.sockJsSession.isDisconnected()) {
229                                        if (logger.isDebugEnabled()) {
230                                                logger.debug("SockJS sockJsSession closed, closing response.");
231                                        }
232                                        response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
233                                        return;
234                                }
235                                if (buffer.remaining() == 0) {
236                                        break;
237                                }
238                                int b = buffer.get();
239                                if (b == '\n') {
240                                        handleFrame();
241                                }
242                                else {
243                                        this.outputStream.write(b);
244                                }
245                        }
246                }
247
248                private void handleFrame() {
249                        byte[] bytes = this.outputStream.toByteArray();
250                        this.outputStream.reset();
251                        String content = new String(bytes, SockJsFrame.CHARSET);
252                        if (logger.isTraceEnabled()) {
253                                logger.trace("XHR content received: " + content);
254                        }
255                        if (!PRELUDE.equals(content)) {
256                                this.sockJsSession.handleFrame(new String(bytes, SockJsFrame.CHARSET));
257                        }
258                }
259
260                @Override
261                public void onSuccess(Response response) {
262                        if (this.outputStream.size() > 0) {
263                                handleFrame();
264                        }
265                        if (logger.isTraceEnabled()) {
266                                logger.trace("XHR receive request completed.");
267                        }
268                        executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
269                }
270
271                @Override
272                public void onFailure(Response response, Throwable failure) {
273                        if (connectFuture.setException(failure)) {
274                                return;
275                        }
276                        if (this.sockJsSession.isDisconnected()) {
277                                this.sockJsSession.afterTransportClosed(null);
278                        }
279                        else {
280                                this.sockJsSession.handleTransportError(failure);
281                                this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
282                        }
283                }
284        }
285
286}