001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.sockjs.client;
018
019import java.net.URI;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026
027import org.springframework.context.Lifecycle;
028import org.springframework.lang.Nullable;
029import org.springframework.util.Assert;
030import org.springframework.util.concurrent.ListenableFuture;
031import org.springframework.util.concurrent.ListenableFutureCallback;
032import org.springframework.util.concurrent.SettableListenableFuture;
033import org.springframework.web.socket.CloseStatus;
034import org.springframework.web.socket.TextMessage;
035import org.springframework.web.socket.WebSocketHandler;
036import org.springframework.web.socket.WebSocketHttpHeaders;
037import org.springframework.web.socket.WebSocketSession;
038import org.springframework.web.socket.client.WebSocketClient;
039import org.springframework.web.socket.handler.TextWebSocketHandler;
040import org.springframework.web.socket.sockjs.transport.TransportType;
041
042/**
043 * A SockJS {@link Transport} that uses a
044 * {@link org.springframework.web.socket.client.WebSocketClient WebSocketClient}.
045 *
046 * @author Rossen Stoyanchev
047 * @since 4.1
048 */
049public class WebSocketTransport implements Transport, Lifecycle {
050
051        private static final Log logger = LogFactory.getLog(WebSocketTransport.class);
052
053        private final WebSocketClient webSocketClient;
054
055        private volatile boolean running = false;
056
057
058        public WebSocketTransport(WebSocketClient webSocketClient) {
059                Assert.notNull(webSocketClient, "WebSocketClient is required");
060                this.webSocketClient = webSocketClient;
061        }
062
063
064        /**
065         * Return the configured {@code WebSocketClient}.
066         */
067        public WebSocketClient getWebSocketClient() {
068                return this.webSocketClient;
069        }
070
071        @Override
072        public List<TransportType> getTransportTypes() {
073                return Collections.singletonList(TransportType.WEBSOCKET);
074        }
075
076        @Override
077        public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
078                final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
079                WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
080                handler = new ClientSockJsWebSocketHandler(session);
081                request.addTimeoutTask(session.getTimeoutTask());
082
083                URI url = request.getTransportUrl();
084                WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
085                if (logger.isDebugEnabled()) {
086                        logger.debug("Starting WebSocket session on " + url);
087                }
088                this.webSocketClient.doHandshake(handler, headers, url).addCallback(
089                                new ListenableFutureCallback<WebSocketSession>() {
090                                        @Override
091                                        public void onSuccess(@Nullable WebSocketSession webSocketSession) {
092                                                // WebSocket session ready, SockJS Session not yet
093                                        }
094                                        @Override
095                                        public void onFailure(Throwable ex) {
096                                                future.setException(ex);
097                                        }
098                                });
099                return future;
100        }
101
102
103        @Override
104        public void start() {
105                if (!isRunning()) {
106                        if (this.webSocketClient instanceof Lifecycle) {
107                                ((Lifecycle) this.webSocketClient).start();
108                        }
109                        else {
110                                this.running = true;
111                        }
112                }
113        }
114
115        @Override
116        public void stop() {
117                if (isRunning()) {
118                        if (this.webSocketClient instanceof Lifecycle) {
119                                ((Lifecycle) this.webSocketClient).stop();
120                        }
121                        else {
122                                this.running = false;
123                        }
124                }
125        }
126
127        @Override
128        public boolean isRunning() {
129                if (this.webSocketClient instanceof Lifecycle) {
130                        return ((Lifecycle) this.webSocketClient).isRunning();
131                }
132                else {
133                        return this.running;
134                }
135        }
136
137
138        @Override
139        public String toString() {
140                return "WebSocketTransport[client=" + this.webSocketClient + "]";
141        }
142
143
144        private static class ClientSockJsWebSocketHandler extends TextWebSocketHandler {
145
146                private final WebSocketClientSockJsSession sockJsSession;
147
148                private final AtomicBoolean connected = new AtomicBoolean(false);
149
150                public ClientSockJsWebSocketHandler(WebSocketClientSockJsSession session) {
151                        Assert.notNull(session, "Session must not be null");
152                        this.sockJsSession = session;
153                }
154
155                @Override
156                public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
157                        Assert.state(this.connected.compareAndSet(false, true), "Already connected");
158                        this.sockJsSession.initializeDelegateSession(webSocketSession);
159                }
160
161                @Override
162                public void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception {
163                        this.sockJsSession.handleFrame(message.getPayload());
164                }
165
166                @Override
167                public void handleTransportError(WebSocketSession webSocketSession, Throwable ex) throws Exception {
168                        this.sockJsSession.handleTransportError(ex);
169                }
170
171                @Override
172                public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus status) throws Exception {
173                        this.sockJsSession.afterTransportClosed(status);
174                }
175        }
176
177}