001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.client;
018
019import java.io.ByteArrayOutputStream;
020import java.net.URI;
021import java.nio.ByteBuffer;
022import java.util.Enumeration;
023
024import org.eclipse.jetty.client.HttpClient;
025import org.eclipse.jetty.client.api.ContentResponse;
026import org.eclipse.jetty.client.api.Request;
027import org.eclipse.jetty.client.api.Response;
028import org.eclipse.jetty.client.util.StringContentProvider;
029import org.eclipse.jetty.http.HttpFields;
030import org.eclipse.jetty.http.HttpMethod;
031
032import org.springframework.context.Lifecycle;
033import org.springframework.http.HttpHeaders;
034import org.springframework.http.HttpStatus;
035import org.springframework.http.ResponseEntity;
036import org.springframework.lang.Nullable;
037import org.springframework.util.Assert;
038import org.springframework.util.StreamUtils;
039import org.springframework.util.concurrent.SettableListenableFuture;
040import org.springframework.web.client.HttpServerErrorException;
041import org.springframework.web.socket.CloseStatus;
042import org.springframework.web.socket.TextMessage;
043import org.springframework.web.socket.WebSocketHandler;
044import org.springframework.web.socket.WebSocketSession;
045import org.springframework.web.socket.sockjs.SockJsException;
046import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
047import org.springframework.web.socket.sockjs.frame.SockJsFrame;
048
049/**
050 * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}.
051 *
052 * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient}
053 * properties must be set to allow a larger than usual number of connections and
054 * threads. For example:
055 *
056 * <pre class="code">
057 * HttpClient httpClient = new HttpClient();
058 * httpClient.setMaxConnectionsPerDestination(1000);
059 * httpClient.setExecutor(new QueuedThreadPool(500));
060 * </pre>
061 *
062 * @author Rossen Stoyanchev
063 * @since 4.1
064 */
065public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle {
066
067        private final HttpClient httpClient;
068
069
070        public JettyXhrTransport(HttpClient httpClient) {
071                Assert.notNull(httpClient, "'httpClient' is required");
072                this.httpClient = httpClient;
073        }
074
075
076        public HttpClient getHttpClient() {
077                return this.httpClient;
078        }
079
080        @Override
081        public void start() {
082                try {
083                        if (!this.httpClient.isRunning()) {
084                                this.httpClient.start();
085                        }
086                }
087                catch (Exception ex) {
088                        throw new SockJsException("Failed to start JettyXhrTransport", ex);
089                }
090        }
091
092        @Override
093        public void stop() {
094                try {
095                        if (this.httpClient.isRunning()) {
096                                this.httpClient.stop();
097                        }
098                }
099                catch (Exception ex) {
100                        throw new SockJsException("Failed to stop JettyXhrTransport", ex);
101                }
102        }
103
104        @Override
105        public boolean isRunning() {
106                return this.httpClient.isRunning();
107        }
108
109
110        @Override
111        protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
112                        URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
113                        SettableListenableFuture<WebSocketSession> connectFuture) {
114
115                HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
116                SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
117                executeReceiveRequest(url, handshakeHeaders, listener);
118        }
119
120        private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
121                if (logger.isTraceEnabled()) {
122                        logger.trace("Starting XHR receive request, url=" + url);
123                }
124                Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
125                addHttpHeaders(httpRequest, headers);
126                httpRequest.send(listener);
127        }
128
129        @Override
130        protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) {
131                return executeRequest(infoUrl, HttpMethod.GET, headers, null);
132        }
133
134        @Override
135        public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
136                return executeRequest(url, HttpMethod.POST, headers, message.getPayload());
137        }
138
139        protected ResponseEntity<String> executeRequest(URI url, HttpMethod method,
140                        HttpHeaders headers, @Nullable String body) {
141
142                Request httpRequest = this.httpClient.newRequest(url).method(method);
143                addHttpHeaders(httpRequest, headers);
144                if (body != null) {
145                        httpRequest.content(new StringContentProvider(body));
146                }
147                ContentResponse response;
148                try {
149                        response = httpRequest.send();
150                }
151                catch (Exception ex) {
152                        throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
153                }
154                HttpStatus status = HttpStatus.valueOf(response.getStatus());
155                HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());
156                return (response.getContent() != null ?
157                                new ResponseEntity<>(response.getContentAsString(), responseHeaders, status) :
158                                new ResponseEntity<>(responseHeaders, status));
159        }
160
161
162        private static void addHttpHeaders(Request request, HttpHeaders headers) {
163                headers.forEach((key, values) -> {
164                        for (String value : values) {
165                                request.header(key, value);
166                        }
167                });
168        }
169
170        private static HttpHeaders toHttpHeaders(HttpFields httpFields) {
171                HttpHeaders responseHeaders = new HttpHeaders();
172                Enumeration<String> names = httpFields.getFieldNames();
173                while (names.hasMoreElements()) {
174                        String name = names.nextElement();
175                        Enumeration<String> values = httpFields.getValues(name);
176                        while (values.hasMoreElements()) {
177                                String value = values.nextElement();
178                                responseHeaders.add(name, value);
179                        }
180                }
181                return responseHeaders;
182        }
183
184
185        /**
186         * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response
187         * Listener} that splits the body of the response into SockJS frames and
188         * delegates them to the {@link XhrClientSockJsSession}.
189         */
190        private class SockJsResponseListener extends Response.Listener.Adapter {
191
192                private final URI transportUrl;
193
194                private final HttpHeaders receiveHeaders;
195
196                private final XhrClientSockJsSession sockJsSession;
197
198                private final SettableListenableFuture<WebSocketSession> connectFuture;
199
200                private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
201
202                public SockJsResponseListener(URI url, HttpHeaders headers,     XhrClientSockJsSession sockJsSession,
203                                SettableListenableFuture<WebSocketSession> connectFuture) {
204
205                        this.transportUrl = url;
206                        this.receiveHeaders = headers;
207                        this.connectFuture = connectFuture;
208                        this.sockJsSession = sockJsSession;
209                }
210
211                @Override
212                public void onBegin(Response response) {
213                        if (response.getStatus() != 200) {
214                                HttpStatus status = HttpStatus.valueOf(response.getStatus());
215                                response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status"));
216                        }
217                }
218
219                @Override
220                public void onHeaders(Response response) {
221                        if (logger.isTraceEnabled()) {
222                                // Convert to HttpHeaders to avoid "\n"
223                                logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
224                        }
225                }
226
227                @Override
228                public void onContent(Response response, ByteBuffer buffer) {
229                        while (true) {
230                                if (this.sockJsSession.isDisconnected()) {
231                                        if (logger.isDebugEnabled()) {
232                                                logger.debug("SockJS sockJsSession closed, closing response.");
233                                        }
234                                        response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
235                                        return;
236                                }
237                                if (buffer.remaining() == 0) {
238                                        break;
239                                }
240                                int b = buffer.get();
241                                if (b == '\n') {
242                                        handleFrame();
243                                }
244                                else {
245                                        this.outputStream.write(b);
246                                }
247                        }
248                }
249
250                private void handleFrame() {
251                        String content = StreamUtils.copyToString(this.outputStream, SockJsFrame.CHARSET);
252                        this.outputStream.reset();
253                        if (logger.isTraceEnabled()) {
254                                logger.trace("XHR content received: " + content);
255                        }
256                        if (!PRELUDE.equals(content)) {
257                                this.sockJsSession.handleFrame(content);
258                        }
259                }
260
261                @Override
262                public void onSuccess(Response response) {
263                        if (this.outputStream.size() > 0) {
264                                handleFrame();
265                        }
266                        if (logger.isTraceEnabled()) {
267                                logger.trace("XHR receive request completed.");
268                        }
269                        executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
270                }
271
272                @Override
273                public void onFailure(Response response, Throwable failure) {
274                        if (this.connectFuture.setException(failure)) {
275                                return;
276                        }
277                        if (this.sockJsSession.isDisconnected()) {
278                                this.sockJsSession.afterTransportClosed(null);
279                        }
280                        else {
281                                this.sockJsSession.handleTransportError(failure);
282                                this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
283                        }
284                }
285        }
286
287}