001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.client;
018
019import java.net.URI;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.eclipse.jetty.websocket.api.Session;
024import org.eclipse.jetty.websocket.api.UpgradeRequest;
025import org.eclipse.jetty.websocket.api.UpgradeResponse;
026import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
027import org.eclipse.jetty.websocket.client.io.UpgradeListener;
028import reactor.core.publisher.Mono;
029import reactor.core.publisher.MonoProcessor;
030
031import org.springframework.context.Lifecycle;
032import org.springframework.core.io.buffer.DataBufferFactory;
033import org.springframework.core.io.buffer.DefaultDataBufferFactory;
034import org.springframework.http.HttpHeaders;
035import org.springframework.web.reactive.socket.HandshakeInfo;
036import org.springframework.web.reactive.socket.WebSocketHandler;
037import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
038import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
039
040/**
041 * A {@link WebSocketClient} implementation for use with Jetty
042 * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
043 *
044 * <p><strong>Note: </strong> the Jetty {@code WebSocketClient} requires
045 * lifecycle management and must be started and stopped. This is automatically
046 * managed when this class is declared as a Spring bean and created with the
047 * default constructor. See constructor notes for more details.
048 *
049 * @author Violeta Georgieva
050 * @author Rossen Stoyanchev
051 * @since 5.0
052 */
053public class JettyWebSocketClient implements WebSocketClient, Lifecycle {
054
055        private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class);
056
057
058        private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient;
059
060        private final boolean externallyManaged;
061
062        private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
063
064
065        /**
066         * Default constructor that creates and manages an instance of a Jetty
067         * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
068         * The instance can be obtained with {@link #getJettyClient()} for further
069         * configuration.
070         *
071         * <p><strong>Note: </strong> When this constructor is used {@link Lifecycle}
072         * methods of this class are delegated to the Jetty {@code WebSocketClient}.
073         */
074        public JettyWebSocketClient() {
075                this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient();
076                this.externallyManaged = false;
077        }
078
079        /**
080         * Constructor that accepts an existing instance of a Jetty
081         * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
082         *
083         * <p><strong>Note: </strong> Use of this constructor implies the Jetty
084         * {@code WebSocketClient} is externally managed and hence {@link Lifecycle}
085         * methods of this class are not delegated to it.
086         */
087        public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) {
088                this.jettyClient = jettyClient;
089                this.externallyManaged = true;
090        }
091
092
093        /**
094         * Return the underlying Jetty {@code WebSocketClient}.
095         */
096        public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() {
097                return this.jettyClient;
098        }
099
100
101        @Override
102        public void start() {
103                if (!this.externallyManaged) {
104                        try {
105                                this.jettyClient.start();
106                        }
107                        catch (Exception ex) {
108                                throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
109                        }
110                }
111        }
112
113        @Override
114        public void stop() {
115                if (!this.externallyManaged) {
116                        try {
117                                this.jettyClient.stop();
118                        }
119                        catch (Exception ex) {
120                                throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex);
121                        }
122                }
123        }
124
125        @Override
126        public boolean isRunning() {
127                return this.jettyClient.isRunning();
128        }
129
130
131        @Override
132        public Mono<Void> execute(URI url, WebSocketHandler handler) {
133                return execute(url, new HttpHeaders(), handler);
134        }
135
136        @Override
137        public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
138                return executeInternal(url, headers, handler);
139        }
140
141        private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
142                MonoProcessor<Void> completionMono = MonoProcessor.create();
143                return Mono.fromCallable(
144                                () -> {
145                                        if (logger.isDebugEnabled()) {
146                                                logger.debug("Connecting to " + url);
147                                        }
148                                        Object jettyHandler = createHandler(url, handler, completionMono);
149                                        ClientUpgradeRequest request = new ClientUpgradeRequest();
150                                        request.setSubProtocols(handler.getSubProtocols());
151                                        UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
152                                        return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
153                                })
154                                .then(completionMono);
155        }
156
157        private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) {
158                return new JettyWebSocketHandlerAdapter(handler, session -> {
159                        HandshakeInfo info = createHandshakeInfo(url, session);
160                        return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
161                });
162        }
163
164        private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) {
165                HttpHeaders headers = new HttpHeaders();
166                jettySession.getUpgradeResponse().getHeaders().forEach(headers::put);
167                String protocol = headers.getFirst("Sec-WebSocket-Protocol");
168                return new HandshakeInfo(url, headers, Mono.empty(), protocol);
169        }
170
171
172        private static class DefaultUpgradeListener implements UpgradeListener {
173
174                private final HttpHeaders headers;
175
176
177                public DefaultUpgradeListener(HttpHeaders headers) {
178                        this.headers = headers;
179                }
180
181                @Override
182                public void onHandshakeRequest(UpgradeRequest request) {
183                        this.headers.forEach(request::setHeader);
184                }
185
186                @Override
187                public void onHandshakeResponse(UpgradeResponse response) {
188                }
189        }
190
191}