001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.client.jetty; 018 019import java.net.URI; 020import java.security.Principal; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.Callable; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026 027import org.eclipse.jetty.websocket.api.Session; 028import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; 029import org.eclipse.jetty.websocket.client.WebSocketClient; 030 031import org.springframework.context.Lifecycle; 032import org.springframework.core.task.AsyncListenableTaskExecutor; 033import org.springframework.core.task.SimpleAsyncTaskExecutor; 034import org.springframework.core.task.TaskExecutor; 035import org.springframework.http.HttpHeaders; 036import org.springframework.lang.Nullable; 037import org.springframework.util.concurrent.ListenableFuture; 038import org.springframework.util.concurrent.ListenableFutureTask; 039import org.springframework.web.socket.WebSocketExtension; 040import org.springframework.web.socket.WebSocketHandler; 041import org.springframework.web.socket.WebSocketSession; 042import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter; 043import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession; 044import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter; 045import org.springframework.web.socket.client.AbstractWebSocketClient; 046import org.springframework.web.util.UriComponents; 047import org.springframework.web.util.UriComponentsBuilder; 048 049/** 050 * Initiates WebSocket requests to a WebSocket server programmatically 051 * through the Jetty WebSocket API. 052 * 053 * <p>As of 4.1 this class implements {@link Lifecycle} rather than 054 * {@link org.springframework.context.SmartLifecycle}. Use 055 * {@link org.springframework.web.socket.client.WebSocketConnectionManager 056 * WebSocketConnectionManager} instead to auto-start a WebSocket connection. 057 * 058 * @author Rossen Stoyanchev 059 * @since 4.0 060 */ 061public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle { 062 063 private final org.eclipse.jetty.websocket.client.WebSocketClient client; 064 065 @Nullable 066 private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 067 068 069 /** 070 * Default constructor that creates an instance of 071 * {@link org.eclipse.jetty.websocket.client.WebSocketClient}. 072 */ 073 public JettyWebSocketClient() { 074 this.client = new org.eclipse.jetty.websocket.client.WebSocketClient(); 075 } 076 077 /** 078 * Constructor that accepts an existing 079 * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance. 080 */ 081 public JettyWebSocketClient(WebSocketClient client) { 082 this.client = client; 083 } 084 085 086 /** 087 * Set an {@link AsyncListenableTaskExecutor} to use when opening connections. 088 * If this property is set to {@code null}, calls to any of the 089 * {@code doHandshake} methods will block until the connection is established. 090 * <p>By default an instance of {@code SimpleAsyncTaskExecutor} is used. 091 */ 092 public void setTaskExecutor(@Nullable AsyncListenableTaskExecutor taskExecutor) { 093 this.taskExecutor = taskExecutor; 094 } 095 096 /** 097 * Return the configured {@link TaskExecutor}. 098 */ 099 @Nullable 100 public AsyncListenableTaskExecutor getTaskExecutor() { 101 return this.taskExecutor; 102 } 103 104 105 @Override 106 public void start() { 107 try { 108 this.client.start(); 109 } 110 catch (Exception ex) { 111 throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex); 112 } 113 } 114 115 @Override 116 public void stop() { 117 try { 118 this.client.stop(); 119 } 120 catch (Exception ex) { 121 logger.error("Failed to stop Jetty WebSocketClient", ex); 122 } 123 } 124 125 @Override 126 public boolean isRunning() { 127 return this.client.isStarted(); 128 } 129 130 131 @Override 132 public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler, 133 String uriTemplate, Object... uriVars) { 134 135 UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode(); 136 return doHandshake(webSocketHandler, null, uriComponents.toUri()); 137 } 138 139 @Override 140 public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler, 141 HttpHeaders headers, final URI uri, List<String> protocols, 142 List<WebSocketExtension> extensions, Map<String, Object> attributes) { 143 144 final ClientUpgradeRequest request = new ClientUpgradeRequest(); 145 request.setSubProtocols(protocols); 146 147 for (WebSocketExtension e : extensions) { 148 request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e)); 149 } 150 151 headers.forEach(request::setHeader); 152 153 Principal user = getUser(); 154 final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user); 155 final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession); 156 157 Callable<WebSocketSession> connectTask = () -> { 158 Future<Session> future = this.client.connect(listener, uri, request); 159 future.get(this.client.getConnectTimeout() + 2000, TimeUnit.MILLISECONDS); 160 return wsSession; 161 }; 162 163 if (this.taskExecutor != null) { 164 return this.taskExecutor.submitListenable(connectTask); 165 } 166 else { 167 ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<>(connectTask); 168 task.run(); 169 return task; 170 } 171 } 172 173 /** 174 * Return the user to make available through {@link WebSocketSession#getPrincipal()}. 175 * By default this method returns {@code null} 176 */ 177 @Nullable 178 protected Principal getUser() { 179 return null; 180 } 181 182}