001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.client; 018 019import java.io.ByteArrayOutputStream; 020import java.net.URI; 021import java.nio.ByteBuffer; 022import java.util.Enumeration; 023import java.util.List; 024import java.util.Map; 025 026import org.eclipse.jetty.client.HttpClient; 027import org.eclipse.jetty.client.api.ContentResponse; 028import org.eclipse.jetty.client.api.Request; 029import org.eclipse.jetty.client.api.Response; 030import org.eclipse.jetty.client.util.StringContentProvider; 031import org.eclipse.jetty.http.HttpFields; 032import org.eclipse.jetty.http.HttpMethod; 033 034import org.springframework.context.Lifecycle; 035import org.springframework.http.HttpHeaders; 036import org.springframework.http.HttpStatus; 037import org.springframework.http.ResponseEntity; 038import org.springframework.util.Assert; 039import org.springframework.util.concurrent.SettableListenableFuture; 040import org.springframework.web.client.HttpServerErrorException; 041import org.springframework.web.socket.CloseStatus; 042import org.springframework.web.socket.TextMessage; 043import org.springframework.web.socket.WebSocketHandler; 044import org.springframework.web.socket.WebSocketSession; 045import org.springframework.web.socket.sockjs.SockJsException; 046import org.springframework.web.socket.sockjs.SockJsTransportFailureException; 047import org.springframework.web.socket.sockjs.frame.SockJsFrame; 048 049/** 050 * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}. 051 * 052 * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient} 053 * properties must be set to allow a larger than usual number of connections and 054 * threads. For example: 055 * 056 * <pre class="code"> 057 * HttpClient httpClient = new HttpClient(); 058 * httpClient.setMaxConnectionsPerDestination(1000); 059 * httpClient.setExecutor(new QueuedThreadPool(500)); 060 * </pre> 061 * 062 * @author Rossen Stoyanchev 063 * @since 4.1 064 */ 065public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle { 066 067 private final HttpClient httpClient; 068 069 070 public JettyXhrTransport(HttpClient httpClient) { 071 Assert.notNull(httpClient, "'httpClient' is required"); 072 this.httpClient = httpClient; 073 } 074 075 076 public HttpClient getHttpClient() { 077 return this.httpClient; 078 } 079 080 @Override 081 public void start() { 082 try { 083 if (!this.httpClient.isRunning()) { 084 this.httpClient.start(); 085 } 086 } 087 catch (Exception ex) { 088 throw new SockJsException("Failed to start JettyXhrTransport", ex); 089 } 090 } 091 092 @Override 093 public void stop() { 094 try { 095 if (this.httpClient.isRunning()) { 096 this.httpClient.stop(); 097 } 098 } 099 catch (Exception ex) { 100 throw new SockJsException("Failed to stop JettyXhrTransport", ex); 101 } 102 } 103 104 @Override 105 public boolean isRunning() { 106 return this.httpClient.isRunning(); 107 } 108 109 110 @Override 111 protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler, 112 URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session, 113 SettableListenableFuture<WebSocketSession> connectFuture) { 114 115 HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders(); 116 SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture); 117 executeReceiveRequest(url, handshakeHeaders, listener); 118 } 119 120 private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) { 121 if (logger.isTraceEnabled()) { 122 logger.trace("Starting XHR receive request, url=" + url); 123 } 124 Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST); 125 addHttpHeaders(httpRequest, headers); 126 httpRequest.send(listener); 127 } 128 129 @Override 130 protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) { 131 return executeRequest(infoUrl, HttpMethod.GET, headers, null); 132 } 133 134 @Override 135 public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) { 136 return executeRequest(url, HttpMethod.POST, headers, message.getPayload()); 137 } 138 139 protected ResponseEntity<String> executeRequest(URI url, HttpMethod method, HttpHeaders headers, String body) { 140 Request httpRequest = this.httpClient.newRequest(url).method(method); 141 addHttpHeaders(httpRequest, headers); 142 if (body != null) { 143 httpRequest.content(new StringContentProvider(body)); 144 } 145 ContentResponse response; 146 try { 147 response = httpRequest.send(); 148 } 149 catch (Exception ex) { 150 throw new SockJsTransportFailureException("Failed to execute request to " + url, ex); 151 } 152 HttpStatus status = HttpStatus.valueOf(response.getStatus()); 153 HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders()); 154 return (response.getContent() != null ? 155 new ResponseEntity<String>(response.getContentAsString(), responseHeaders, status) : 156 new ResponseEntity<String>(responseHeaders, status)); 157 } 158 159 160 private static void addHttpHeaders(Request request, HttpHeaders headers) { 161 for (Map.Entry<String, List<String>> entry : headers.entrySet()) { 162 for (String value : entry.getValue()) { 163 request.header(entry.getKey(), value); 164 } 165 } 166 } 167 168 private static HttpHeaders toHttpHeaders(HttpFields httpFields) { 169 HttpHeaders responseHeaders = new HttpHeaders(); 170 Enumeration<String> names = httpFields.getFieldNames(); 171 while (names.hasMoreElements()) { 172 String name = names.nextElement(); 173 Enumeration<String> values = httpFields.getValues(name); 174 while (values.hasMoreElements()) { 175 String value = values.nextElement(); 176 responseHeaders.add(name, value); 177 } 178 } 179 return responseHeaders; 180 } 181 182 183 /** 184 * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response 185 * Listener} that splits the body of the response into SockJS frames and 186 * delegates them to the {@link XhrClientSockJsSession}. 187 */ 188 private class SockJsResponseListener extends Response.Listener.Adapter { 189 190 private final URI transportUrl; 191 192 private final HttpHeaders receiveHeaders; 193 194 private final XhrClientSockJsSession sockJsSession; 195 196 private final SettableListenableFuture<WebSocketSession> connectFuture; 197 198 private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 199 200 public SockJsResponseListener(URI url, HttpHeaders headers, XhrClientSockJsSession sockJsSession, 201 SettableListenableFuture<WebSocketSession> connectFuture) { 202 203 this.transportUrl = url; 204 this.receiveHeaders = headers; 205 this.connectFuture = connectFuture; 206 this.sockJsSession = sockJsSession; 207 } 208 209 @Override 210 public void onBegin(Response response) { 211 if (response.getStatus() != 200) { 212 HttpStatus status = HttpStatus.valueOf(response.getStatus()); 213 response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status")); 214 } 215 } 216 217 @Override 218 public void onHeaders(Response response) { 219 if (logger.isTraceEnabled()) { 220 // Convert to HttpHeaders to avoid "\n" 221 logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders())); 222 } 223 } 224 225 @Override 226 public void onContent(Response response, ByteBuffer buffer) { 227 while (true) { 228 if (this.sockJsSession.isDisconnected()) { 229 if (logger.isDebugEnabled()) { 230 logger.debug("SockJS sockJsSession closed, closing response."); 231 } 232 response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null)); 233 return; 234 } 235 if (buffer.remaining() == 0) { 236 break; 237 } 238 int b = buffer.get(); 239 if (b == '\n') { 240 handleFrame(); 241 } 242 else { 243 this.outputStream.write(b); 244 } 245 } 246 } 247 248 private void handleFrame() { 249 byte[] bytes = this.outputStream.toByteArray(); 250 this.outputStream.reset(); 251 String content = new String(bytes, SockJsFrame.CHARSET); 252 if (logger.isTraceEnabled()) { 253 logger.trace("XHR content received: " + content); 254 } 255 if (!PRELUDE.equals(content)) { 256 this.sockJsSession.handleFrame(new String(bytes, SockJsFrame.CHARSET)); 257 } 258 } 259 260 @Override 261 public void onSuccess(Response response) { 262 if (this.outputStream.size() > 0) { 263 handleFrame(); 264 } 265 if (logger.isTraceEnabled()) { 266 logger.trace("XHR receive request completed."); 267 } 268 executeReceiveRequest(this.transportUrl, this.receiveHeaders, this); 269 } 270 271 @Override 272 public void onFailure(Response response, Throwable failure) { 273 if (connectFuture.setException(failure)) { 274 return; 275 } 276 if (this.sockJsSession.isDisconnected()) { 277 this.sockJsSession.afterTransportClosed(null); 278 } 279 else { 280 this.sockJsSession.handleTransportError(failure); 281 this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage())); 282 } 283 } 284 } 285 286}