001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.client; 018 019import java.net.URI; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.eclipse.jetty.websocket.api.Session; 024import org.eclipse.jetty.websocket.api.UpgradeRequest; 025import org.eclipse.jetty.websocket.api.UpgradeResponse; 026import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; 027import org.eclipse.jetty.websocket.client.io.UpgradeListener; 028import reactor.core.publisher.Mono; 029import reactor.core.publisher.MonoProcessor; 030 031import org.springframework.context.Lifecycle; 032import org.springframework.core.io.buffer.DataBufferFactory; 033import org.springframework.core.io.buffer.DefaultDataBufferFactory; 034import org.springframework.http.HttpHeaders; 035import org.springframework.web.reactive.socket.HandshakeInfo; 036import org.springframework.web.reactive.socket.WebSocketHandler; 037import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; 038import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; 039 040/** 041 * A {@link WebSocketClient} implementation for use with Jetty 042 * {@link org.eclipse.jetty.websocket.client.WebSocketClient}. 043 * 044 * <p><strong>Note: </strong> the Jetty {@code WebSocketClient} requires 045 * lifecycle management and must be started and stopped. This is automatically 046 * managed when this class is declared as a Spring bean and created with the 047 * default constructor. See constructor notes for more details. 048 * 049 * @author Violeta Georgieva 050 * @author Rossen Stoyanchev 051 * @since 5.0 052 */ 053public class JettyWebSocketClient implements WebSocketClient, Lifecycle { 054 055 private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class); 056 057 058 private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient; 059 060 private final boolean externallyManaged; 061 062 private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 063 064 065 /** 066 * Default constructor that creates and manages an instance of a Jetty 067 * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}. 068 * The instance can be obtained with {@link #getJettyClient()} for further 069 * configuration. 070 * 071 * <p><strong>Note: </strong> When this constructor is used {@link Lifecycle} 072 * methods of this class are delegated to the Jetty {@code WebSocketClient}. 073 */ 074 public JettyWebSocketClient() { 075 this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient(); 076 this.externallyManaged = false; 077 } 078 079 /** 080 * Constructor that accepts an existing instance of a Jetty 081 * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}. 082 * 083 * <p><strong>Note: </strong> Use of this constructor implies the Jetty 084 * {@code WebSocketClient} is externally managed and hence {@link Lifecycle} 085 * methods of this class are not delegated to it. 086 */ 087 public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) { 088 this.jettyClient = jettyClient; 089 this.externallyManaged = true; 090 } 091 092 093 /** 094 * Return the underlying Jetty {@code WebSocketClient}. 095 */ 096 public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() { 097 return this.jettyClient; 098 } 099 100 101 @Override 102 public void start() { 103 if (!this.externallyManaged) { 104 try { 105 this.jettyClient.start(); 106 } 107 catch (Exception ex) { 108 throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex); 109 } 110 } 111 } 112 113 @Override 114 public void stop() { 115 if (!this.externallyManaged) { 116 try { 117 this.jettyClient.stop(); 118 } 119 catch (Exception ex) { 120 throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex); 121 } 122 } 123 } 124 125 @Override 126 public boolean isRunning() { 127 return this.jettyClient.isRunning(); 128 } 129 130 131 @Override 132 public Mono<Void> execute(URI url, WebSocketHandler handler) { 133 return execute(url, new HttpHeaders(), handler); 134 } 135 136 @Override 137 public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) { 138 return executeInternal(url, headers, handler); 139 } 140 141 private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { 142 MonoProcessor<Void> completionMono = MonoProcessor.create(); 143 return Mono.fromCallable( 144 () -> { 145 if (logger.isDebugEnabled()) { 146 logger.debug("Connecting to " + url); 147 } 148 Object jettyHandler = createHandler(url, handler, completionMono); 149 ClientUpgradeRequest request = new ClientUpgradeRequest(); 150 request.setSubProtocols(handler.getSubProtocols()); 151 UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); 152 return this.jettyClient.connect(jettyHandler, url, request, upgradeListener); 153 }) 154 .then(completionMono); 155 } 156 157 private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) { 158 return new JettyWebSocketHandlerAdapter(handler, session -> { 159 HandshakeInfo info = createHandshakeInfo(url, session); 160 return new JettyWebSocketSession(session, info, this.bufferFactory, completion); 161 }); 162 } 163 164 private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) { 165 HttpHeaders headers = new HttpHeaders(); 166 jettySession.getUpgradeResponse().getHeaders().forEach(headers::put); 167 String protocol = headers.getFirst("Sec-WebSocket-Protocol"); 168 return new HandshakeInfo(url, headers, Mono.empty(), protocol); 169 } 170 171 172 private static class DefaultUpgradeListener implements UpgradeListener { 173 174 private final HttpHeaders headers; 175 176 177 public DefaultUpgradeListener(HttpHeaders headers) { 178 this.headers = headers; 179 } 180 181 @Override 182 public void onHandshakeRequest(UpgradeRequest request) { 183 this.headers.forEach(request::setHeader); 184 } 185 186 @Override 187 public void onHandshakeResponse(UpgradeResponse response) { 188 } 189 } 190 191}