001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.client.jetty;
018
019import java.net.URI;
020import java.security.Principal;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.Callable;
024import java.util.concurrent.Future;
025
026import org.eclipse.jetty.websocket.api.Session;
027import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
028import org.eclipse.jetty.websocket.client.WebSocketClient;
029
030import org.springframework.context.Lifecycle;
031import org.springframework.core.task.AsyncListenableTaskExecutor;
032import org.springframework.core.task.SimpleAsyncTaskExecutor;
033import org.springframework.core.task.TaskExecutor;
034import org.springframework.http.HttpHeaders;
035import org.springframework.util.concurrent.ListenableFuture;
036import org.springframework.util.concurrent.ListenableFutureTask;
037import org.springframework.web.socket.WebSocketExtension;
038import org.springframework.web.socket.WebSocketHandler;
039import org.springframework.web.socket.WebSocketSession;
040import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
041import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
042import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
043import org.springframework.web.socket.client.AbstractWebSocketClient;
044import org.springframework.web.util.UriComponents;
045import org.springframework.web.util.UriComponentsBuilder;
046
047/**
048 * Initiates WebSocket requests to a WebSocket server programmatically
049 * through the Jetty WebSocket API.
050 *
051 * <p>As of 4.1 this class implements {@link Lifecycle} rather than
052 * {@link org.springframework.context.SmartLifecycle}. Use
053 * {@link org.springframework.web.socket.client.WebSocketConnectionManager
054 * WebSocketConnectionManager} instead to auto-start a WebSocket connection.
055 *
056 * @author Rossen Stoyanchev
057 * @since 4.0
058 */
059public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle {
060
061        private final org.eclipse.jetty.websocket.client.WebSocketClient client;
062
063        private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
064
065
066        /**
067         * Default constructor that creates an instance of
068         * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
069         */
070        public JettyWebSocketClient() {
071                this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
072        }
073
074        /**
075         * Constructor that accepts an existing
076         * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
077         */
078        public JettyWebSocketClient(WebSocketClient client) {
079                this.client = client;
080        }
081
082
083        /**
084         * Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
085         * If this property is set to {@code null}, calls to any of the
086         * {@code doHandshake} methods will block until the connection is established.
087         * <p>By default an instance of {@code SimpleAsyncTaskExecutor} is used.
088         */
089        public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
090                this.taskExecutor = taskExecutor;
091        }
092
093        /**
094         * Return the configured {@link TaskExecutor}.
095         */
096        public AsyncListenableTaskExecutor getTaskExecutor() {
097                return this.taskExecutor;
098        }
099
100
101        @Override
102        public void start() {
103                try {
104                        this.client.start();
105                }
106                catch (Exception ex) {
107                        throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
108                }
109        }
110
111        @Override
112        public void stop() {
113                try {
114                        this.client.stop();
115                }
116                catch (Exception ex) {
117                        logger.error("Failed to stop Jetty WebSocketClient", ex);
118                }
119        }
120
121        @Override
122        public boolean isRunning() {
123                return this.client.isStarted();
124        }
125
126
127        @Override
128        public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
129                        String uriTemplate, Object... uriVars) {
130
131                UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode();
132                return doHandshake(webSocketHandler, null, uriComponents.toUri());
133        }
134
135        @Override
136        public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler,
137                        HttpHeaders headers, final URI uri, List<String> protocols,
138                        List<WebSocketExtension> extensions,  Map<String, Object> attributes) {
139
140                final ClientUpgradeRequest request = new ClientUpgradeRequest();
141                request.setSubProtocols(protocols);
142
143                for (WebSocketExtension e : extensions) {
144                        request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e));
145                }
146
147                for (String header : headers.keySet()) {
148                        request.setHeader(header, headers.get(header));
149                }
150
151                Principal user = getUser();
152                final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user);
153                final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
154
155                Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
156                        @Override
157                        public WebSocketSession call() throws Exception {
158                                Future<Session> future = client.connect(listener, uri, request);
159                                future.get();
160                                return wsSession;
161                        }
162                };
163
164                if (this.taskExecutor != null) {
165                        return this.taskExecutor.submitListenable(connectTask);
166                }
167                else {
168                        ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
169                        task.run();
170                        return task;
171                }
172        }
173
174        /**
175         * @return the user to make available through {@link WebSocketSession#getPrincipal()};
176         *      by default this method returns {@code null}
177         */
178        protected Principal getUser() {
179                return null;
180        }
181
182}