001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.client; 018 019import java.io.ByteArrayOutputStream; 020import java.net.URI; 021import java.nio.ByteBuffer; 022import java.util.Enumeration; 023 024import org.eclipse.jetty.client.HttpClient; 025import org.eclipse.jetty.client.api.ContentResponse; 026import org.eclipse.jetty.client.api.Request; 027import org.eclipse.jetty.client.api.Response; 028import org.eclipse.jetty.client.util.StringContentProvider; 029import org.eclipse.jetty.http.HttpFields; 030import org.eclipse.jetty.http.HttpMethod; 031 032import org.springframework.context.Lifecycle; 033import org.springframework.http.HttpHeaders; 034import org.springframework.http.HttpStatus; 035import org.springframework.http.ResponseEntity; 036import org.springframework.lang.Nullable; 037import org.springframework.util.Assert; 038import org.springframework.util.StreamUtils; 039import org.springframework.util.concurrent.SettableListenableFuture; 040import org.springframework.web.client.HttpServerErrorException; 041import org.springframework.web.socket.CloseStatus; 042import org.springframework.web.socket.TextMessage; 043import org.springframework.web.socket.WebSocketHandler; 044import org.springframework.web.socket.WebSocketSession; 045import org.springframework.web.socket.sockjs.SockJsException; 046import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 047import org.springframework.web.socket.sockjs.frame.SockJsFrame; 048 049/** 050 * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}. 051 * 052 * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient} 053 * properties must be set to allow a larger than usual number of connections and 054 * threads. For example: 055 * 056 * <pre class="code"> 057 * HttpClient httpClient = new HttpClient(); 058 * httpClient.setMaxConnectionsPerDestination(1000); 059 * httpClient.setExecutor(new QueuedThreadPool(500)); 060 * </pre> 061 * 062 * @author Rossen Stoyanchev 063 * @since 4.1 064 */ 065public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle { 066 067 private final HttpClient httpClient; 068 069 070 public JettyXhrTransport(HttpClient httpClient) { 071 Assert.notNull(httpClient, "'httpClient' is required"); 072 this.httpClient = httpClient; 073 } 074 075 076 public HttpClient getHttpClient() { 077 return this.httpClient; 078 } 079 080 @Override 081 public void start() { 082 try { 083 if (!this.httpClient.isRunning()) { 084 this.httpClient.start(); 085 } 086 } 087 catch (Exception ex) { 088 throw new SockJsException("Failed to start JettyXhrTransport", ex); 089 } 090 } 091 092 @Override 093 public void stop() { 094 try { 095 if (this.httpClient.isRunning()) { 096 this.httpClient.stop(); 097 } 098 } 099 catch (Exception ex) { 100 throw new SockJsException("Failed to stop JettyXhrTransport", ex); 101 } 102 } 103 104 @Override 105 public boolean isRunning() { 106 return this.httpClient.isRunning(); 107 } 108 109 110 @Override 111 protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler, 112 URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session, 113 SettableListenableFuture<WebSocketSession> connectFuture) { 114 115 HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders(); 116 SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture); 117 executeReceiveRequest(url, handshakeHeaders, listener); 118 } 119 120 private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) { 121 if (logger.isTraceEnabled()) { 122 logger.trace("Starting XHR receive request, url=" + url); 123 } 124 Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST); 125 addHttpHeaders(httpRequest, headers); 126 httpRequest.send(listener); 127 } 128 129 @Override 130 protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) { 131 return executeRequest(infoUrl, HttpMethod.GET, headers, null); 132 } 133 134 @Override 135 public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) { 136 return executeRequest(url, HttpMethod.POST, headers, message.getPayload()); 137 } 138 139 protected ResponseEntity<String> executeRequest(URI url, HttpMethod method, 140 HttpHeaders headers, @Nullable String body) { 141 142 Request httpRequest = this.httpClient.newRequest(url).method(method); 143 addHttpHeaders(httpRequest, headers); 144 if (body != null) { 145 httpRequest.content(new StringContentProvider(body)); 146 } 147 ContentResponse response; 148 try { 149 response = httpRequest.send(); 150 } 151 catch (Exception ex) { 152 throw new SockJsTransportFailureException("Failed to execute request to " + url, ex); 153 } 154 HttpStatus status = HttpStatus.valueOf(response.getStatus()); 155 HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders()); 156 return (response.getContent() != null ? 157 new ResponseEntity<>(response.getContentAsString(), responseHeaders, status) : 158 new ResponseEntity<>(responseHeaders, status)); 159 } 160 161 162 private static void addHttpHeaders(Request request, HttpHeaders headers) { 163 headers.forEach((key, values) -> { 164 for (String value : values) { 165 request.header(key, value); 166 } 167 }); 168 } 169 170 private static HttpHeaders toHttpHeaders(HttpFields httpFields) { 171 HttpHeaders responseHeaders = new HttpHeaders(); 172 Enumeration<String> names = httpFields.getFieldNames(); 173 while (names.hasMoreElements()) { 174 String name = names.nextElement(); 175 Enumeration<String> values = httpFields.getValues(name); 176 while (values.hasMoreElements()) { 177 String value = values.nextElement(); 178 responseHeaders.add(name, value); 179 } 180 } 181 return responseHeaders; 182 } 183 184 185 /** 186 * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response 187 * Listener} that splits the body of the response into SockJS frames and 188 * delegates them to the {@link XhrClientSockJsSession}. 189 */ 190 private class SockJsResponseListener extends Response.Listener.Adapter { 191 192 private final URI transportUrl; 193 194 private final HttpHeaders receiveHeaders; 195 196 private final XhrClientSockJsSession sockJsSession; 197 198 private final SettableListenableFuture<WebSocketSession> connectFuture; 199 200 private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 201 202 public SockJsResponseListener(URI url, HttpHeaders headers, XhrClientSockJsSession sockJsSession, 203 SettableListenableFuture<WebSocketSession> connectFuture) { 204 205 this.transportUrl = url; 206 this.receiveHeaders = headers; 207 this.connectFuture = connectFuture; 208 this.sockJsSession = sockJsSession; 209 } 210 211 @Override 212 public void onBegin(Response response) { 213 if (response.getStatus() != 200) { 214 HttpStatus status = HttpStatus.valueOf(response.getStatus()); 215 response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status")); 216 } 217 } 218 219 @Override 220 public void onHeaders(Response response) { 221 if (logger.isTraceEnabled()) { 222 // Convert to HttpHeaders to avoid "\n" 223 logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders())); 224 } 225 } 226 227 @Override 228 public void onContent(Response response, ByteBuffer buffer) { 229 while (true) { 230 if (this.sockJsSession.isDisconnected()) { 231 if (logger.isDebugEnabled()) { 232 logger.debug("SockJS sockJsSession closed, closing response."); 233 } 234 response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null)); 235 return; 236 } 237 if (buffer.remaining() == 0) { 238 break; 239 } 240 int b = buffer.get(); 241 if (b == '\n') { 242 handleFrame(); 243 } 244 else { 245 this.outputStream.write(b); 246 } 247 } 248 } 249 250 private void handleFrame() { 251 String content = StreamUtils.copyToString(this.outputStream, SockJsFrame.CHARSET); 252 this.outputStream.reset(); 253 if (logger.isTraceEnabled()) { 254 logger.trace("XHR content received: " + content); 255 } 256 if (!PRELUDE.equals(content)) { 257 this.sockJsSession.handleFrame(content); 258 } 259 } 260 261 @Override 262 public void onSuccess(Response response) { 263 if (this.outputStream.size() > 0) { 264 handleFrame(); 265 } 266 if (logger.isTraceEnabled()) { 267 logger.trace("XHR receive request completed."); 268 } 269 executeReceiveRequest(this.transportUrl, this.receiveHeaders, this); 270 } 271 272 @Override 273 public void onFailure(Response response, Throwable failure) { 274 if (this.connectFuture.setException(failure)) { 275 return; 276 } 277 if (this.sockJsSession.isDisconnected()) { 278 this.sockJsSession.afterTransportClosed(null); 279 } 280 else { 281 this.sockJsSession.handleTransportError(failure); 282 this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage())); 283 } 284 } 285 } 286 287}