001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.sockjs.client; 018 019import java.net.URI; 020import java.util.Collections; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026 027import org.springframework.context.Lifecycle; 028import org.springframework.util.Assert; 029import org.springframework.util.concurrent.ListenableFuture; 030import org.springframework.util.concurrent.ListenableFutureCallback; 031import org.springframework.util.concurrent.SettableListenableFuture; 032import org.springframework.web.socket.CloseStatus; 033import org.springframework.web.socket.TextMessage; 034import org.springframework.web.socket.WebSocketHandler; 035import org.springframework.web.socket.WebSocketHttpHeaders; 036import org.springframework.web.socket.WebSocketSession; 037import org.springframework.web.socket.client.WebSocketClient; 038import org.springframework.web.socket.handler.TextWebSocketHandler; 039import org.springframework.web.socket.sockjs.transport.TransportType; 040 041/** 042 * A SockJS {@link Transport} that uses a 043 * {@link org.springframework.web.socket.client.WebSocketClient WebSocketClient}. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.1 047 */ 048public class WebSocketTransport implements Transport, Lifecycle { 049 050 private static final Log logger = LogFactory.getLog(WebSocketTransport.class); 051 052 private final WebSocketClient webSocketClient; 053 054 private volatile boolean running = false; 055 056 057 public WebSocketTransport(WebSocketClient webSocketClient) { 058 Assert.notNull(webSocketClient, "WebSocketClient is required"); 059 this.webSocketClient = webSocketClient; 060 } 061 062 063 /** 064 * Return the configured {@code WebSocketClient}. 065 */ 066 public WebSocketClient getWebSocketClient() { 067 return this.webSocketClient; 068 } 069 070 @Override 071 public List<TransportType> getTransportTypes() { 072 return Collections.singletonList(TransportType.WEBSOCKET); 073 } 074 075 @Override 076 public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) { 077 final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<WebSocketSession>(); 078 WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future); 079 handler = new ClientSockJsWebSocketHandler(session); 080 request.addTimeoutTask(session.getTimeoutTask()); 081 082 URI url = request.getTransportUrl(); 083 WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders()); 084 if (logger.isDebugEnabled()) { 085 logger.debug("Starting WebSocket session on " + url); 086 } 087 this.webSocketClient.doHandshake(handler, headers, url).addCallback( 088 new ListenableFutureCallback<WebSocketSession>() { 089 @Override 090 public void onSuccess(WebSocketSession webSocketSession) { 091 // WebSocket session ready, SockJS Session not yet 092 } 093 @Override 094 public void onFailure(Throwable ex) { 095 future.setException(ex); 096 } 097 }); 098 return future; 099 } 100 101 102 @Override 103 public void start() { 104 if (!isRunning()) { 105 if (this.webSocketClient instanceof Lifecycle) { 106 ((Lifecycle) this.webSocketClient).start(); 107 } 108 else { 109 this.running = true; 110 } 111 } 112 } 113 114 @Override 115 public void stop() { 116 if (isRunning()) { 117 if (this.webSocketClient instanceof Lifecycle) { 118 ((Lifecycle) this.webSocketClient).stop(); 119 } 120 else { 121 this.running = false; 122 } 123 } 124 } 125 126 @Override 127 public boolean isRunning() { 128 if (this.webSocketClient instanceof Lifecycle) { 129 return ((Lifecycle) this.webSocketClient).isRunning(); 130 } 131 else { 132 return this.running; 133 } 134 } 135 136 137 @Override 138 public String toString() { 139 return "WebSocketTransport[client=" + this.webSocketClient + "]"; 140 } 141 142 143 private static class ClientSockJsWebSocketHandler extends TextWebSocketHandler { 144 145 private final WebSocketClientSockJsSession sockJsSession; 146 147 private final AtomicBoolean connected = new AtomicBoolean(false); 148 149 public ClientSockJsWebSocketHandler(WebSocketClientSockJsSession session) { 150 Assert.notNull(session, "Session must not be null"); 151 this.sockJsSession = session; 152 } 153 154 @Override 155 public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception { 156 Assert.state(this.connected.compareAndSet(false, true), "Already connected"); 157 this.sockJsSession.initializeDelegateSession(webSocketSession); 158 } 159 160 @Override 161 public void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception { 162 this.sockJsSession.handleFrame(message.getPayload()); 163 } 164 165 @Override 166 public void handleTransportError(WebSocketSession webSocketSession, Throwable ex) throws Exception { 167 this.sockJsSession.handleTransportError(ex); 168 } 169 170 @Override 171 public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus status) throws Exception { 172 this.sockJsSession.afterTransportClosed(status); 173 } 174 } 175 176}