001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.client; 018 019import java.net.URI; 020import java.util.Collections; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026 027import org.springframework.context.Lifecycle; 028import org.springframework.lang.Nullable; 029import org.springframework.util.Assert; 030import org.springframework.util.concurrent.ListenableFuture; 031import org.springframework.util.concurrent.ListenableFutureCallback; 032import org.springframework.util.concurrent.SettableListenableFuture; 033import org.springframework.web.socket.CloseStatus; 034import org.springframework.web.socket.TextMessage; 035import org.springframework.web.socket.WebSocketHandler; 036import org.springframework.web.socket.WebSocketHttpHeaders; 037import org.springframework.web.socket.WebSocketSession; 038import org.springframework.web.socket.client.WebSocketClient; 039import org.springframework.web.socket.handler.TextWebSocketHandler; 040import org.springframework.web.socket.sockjs.transport.TransportType; 041 042/** 043 * A SockJS {@link Transport} that uses a 044 * {@link org.springframework.web.socket.client.WebSocketClient WebSocketClient}. 045 * 046 * @author Rossen Stoyanchev 047 * @since 4.1 048 */ 049public class WebSocketTransport implements Transport, Lifecycle { 050 051 private static final Log logger = LogFactory.getLog(WebSocketTransport.class); 052 053 private final WebSocketClient webSocketClient; 054 055 private volatile boolean running = false; 056 057 058 public WebSocketTransport(WebSocketClient webSocketClient) { 059 Assert.notNull(webSocketClient, "WebSocketClient is required"); 060 this.webSocketClient = webSocketClient; 061 } 062 063 064 /** 065 * Return the configured {@code WebSocketClient}. 066 */ 067 public WebSocketClient getWebSocketClient() { 068 return this.webSocketClient; 069 } 070 071 @Override 072 public List<TransportType> getTransportTypes() { 073 return Collections.singletonList(TransportType.WEBSOCKET); 074 } 075 076 @Override 077 public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) { 078 final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>(); 079 WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future); 080 handler = new ClientSockJsWebSocketHandler(session); 081 request.addTimeoutTask(session.getTimeoutTask()); 082 083 URI url = request.getTransportUrl(); 084 WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders()); 085 if (logger.isDebugEnabled()) { 086 logger.debug("Starting WebSocket session on " + url); 087 } 088 this.webSocketClient.doHandshake(handler, headers, url).addCallback( 089 new ListenableFutureCallback<WebSocketSession>() { 090 @Override 091 public void onSuccess(@Nullable WebSocketSession webSocketSession) { 092 // WebSocket session ready, SockJS Session not yet 093 } 094 @Override 095 public void onFailure(Throwable ex) { 096 future.setException(ex); 097 } 098 }); 099 return future; 100 } 101 102 103 @Override 104 public void start() { 105 if (!isRunning()) { 106 if (this.webSocketClient instanceof Lifecycle) { 107 ((Lifecycle) this.webSocketClient).start(); 108 } 109 else { 110 this.running = true; 111 } 112 } 113 } 114 115 @Override 116 public void stop() { 117 if (isRunning()) { 118 if (this.webSocketClient instanceof Lifecycle) { 119 ((Lifecycle) this.webSocketClient).stop(); 120 } 121 else { 122 this.running = false; 123 } 124 } 125 } 126 127 @Override 128 public boolean isRunning() { 129 if (this.webSocketClient instanceof Lifecycle) { 130 return ((Lifecycle) this.webSocketClient).isRunning(); 131 } 132 else { 133 return this.running; 134 } 135 } 136 137 138 @Override 139 public String toString() { 140 return "WebSocketTransport[client=" + this.webSocketClient + "]"; 141 } 142 143 144 private static class ClientSockJsWebSocketHandler extends TextWebSocketHandler { 145 146 private final WebSocketClientSockJsSession sockJsSession; 147 148 private final AtomicBoolean connected = new AtomicBoolean(false); 149 150 public ClientSockJsWebSocketHandler(WebSocketClientSockJsSession session) { 151 Assert.notNull(session, "Session must not be null"); 152 this.sockJsSession = session; 153 } 154 155 @Override 156 public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception { 157 Assert.state(this.connected.compareAndSet(false, true), "Already connected"); 158 this.sockJsSession.initializeDelegateSession(webSocketSession); 159 } 160 161 @Override 162 public void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception { 163 this.sockJsSession.handleFrame(message.getPayload()); 164 } 165 166 @Override 167 public void handleTransportError(WebSocketSession webSocketSession, Throwable ex) throws Exception { 168 this.sockJsSession.handleTransportError(ex); 169 } 170 171 @Override 172 public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus status) throws Exception { 173 this.sockJsSession.afterTransportClosed(status); 174 } 175 } 176 177}