001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.client.jetty;
018
019import java.net.URI;
020import java.security.Principal;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.Callable;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026
027import org.eclipse.jetty.websocket.api.Session;
028import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
029import org.eclipse.jetty.websocket.client.WebSocketClient;
030
031import org.springframework.context.Lifecycle;
032import org.springframework.core.task.AsyncListenableTaskExecutor;
033import org.springframework.core.task.SimpleAsyncTaskExecutor;
034import org.springframework.core.task.TaskExecutor;
035import org.springframework.http.HttpHeaders;
036import org.springframework.lang.Nullable;
037import org.springframework.util.concurrent.ListenableFuture;
038import org.springframework.util.concurrent.ListenableFutureTask;
039import org.springframework.web.socket.WebSocketExtension;
040import org.springframework.web.socket.WebSocketHandler;
041import org.springframework.web.socket.WebSocketSession;
042import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
043import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
044import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
045import org.springframework.web.socket.client.AbstractWebSocketClient;
046import org.springframework.web.util.UriComponents;
047import org.springframework.web.util.UriComponentsBuilder;
048
049/**
050 * Initiates WebSocket requests to a WebSocket server programmatically
051 * through the Jetty WebSocket API.
052 *
053 * <p>As of 4.1 this class implements {@link Lifecycle} rather than
054 * {@link org.springframework.context.SmartLifecycle}. Use
055 * {@link org.springframework.web.socket.client.WebSocketConnectionManager
056 * WebSocketConnectionManager} instead to auto-start a WebSocket connection.
057 *
058 * @author Rossen Stoyanchev
059 * @since 4.0
060 */
061public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle {
062
063        private final org.eclipse.jetty.websocket.client.WebSocketClient client;
064
065        @Nullable
066        private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
067
068
069        /**
070         * Default constructor that creates an instance of
071         * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
072         */
073        public JettyWebSocketClient() {
074                this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
075        }
076
077        /**
078         * Constructor that accepts an existing
079         * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
080         */
081        public JettyWebSocketClient(WebSocketClient client) {
082                this.client = client;
083        }
084
085
086        /**
087         * Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
088         * If this property is set to {@code null}, calls to any of the
089         * {@code doHandshake} methods will block until the connection is established.
090         * <p>By default an instance of {@code SimpleAsyncTaskExecutor} is used.
091         */
092        public void setTaskExecutor(@Nullable AsyncListenableTaskExecutor taskExecutor) {
093                this.taskExecutor = taskExecutor;
094        }
095
096        /**
097         * Return the configured {@link TaskExecutor}.
098         */
099        @Nullable
100        public AsyncListenableTaskExecutor getTaskExecutor() {
101                return this.taskExecutor;
102        }
103
104
105        @Override
106        public void start() {
107                try {
108                        this.client.start();
109                }
110                catch (Exception ex) {
111                        throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
112                }
113        }
114
115        @Override
116        public void stop() {
117                try {
118                        this.client.stop();
119                }
120                catch (Exception ex) {
121                        logger.error("Failed to stop Jetty WebSocketClient", ex);
122                }
123        }
124
125        @Override
126        public boolean isRunning() {
127                return this.client.isStarted();
128        }
129
130
131        @Override
132        public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
133                        String uriTemplate, Object... uriVars) {
134
135                UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode();
136                return doHandshake(webSocketHandler, null, uriComponents.toUri());
137        }
138
139        @Override
140        public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler,
141                        HttpHeaders headers, final URI uri, List<String> protocols,
142                        List<WebSocketExtension> extensions,  Map<String, Object> attributes) {
143
144                final ClientUpgradeRequest request = new ClientUpgradeRequest();
145                request.setSubProtocols(protocols);
146
147                for (WebSocketExtension e : extensions) {
148                        request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e));
149                }
150
151                headers.forEach(request::setHeader);
152
153                Principal user = getUser();
154                final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user);
155                final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
156
157                Callable<WebSocketSession> connectTask = () -> {
158                        Future<Session> future = this.client.connect(listener, uri, request);
159                        future.get(this.client.getConnectTimeout() + 2000, TimeUnit.MILLISECONDS);
160                        return wsSession;
161                };
162
163                if (this.taskExecutor != null) {
164                        return this.taskExecutor.submitListenable(connectTask);
165                }
166                else {
167                        ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<>(connectTask);
168                        task.run();
169                        return task;
170                }
171        }
172
173        /**
174         * Return the user to make available through {@link WebSocketSession#getPrincipal()}.
175         * By default this method returns {@code null}
176         */
177        @Nullable
178        protected Principal getUser() {
179                return null;
180        }
181
182}