001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.io.IOException; 020import java.util.Queue; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.reactivestreams.Publisher; 024import org.reactivestreams.Subscriber; 025import org.reactivestreams.Subscription; 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028import reactor.core.publisher.MonoProcessor; 029import reactor.util.concurrent.Queues; 030 031import org.springframework.core.io.buffer.DataBufferFactory; 032import org.springframework.http.server.reactive.AbstractListenerReadPublisher; 033import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; 034import org.springframework.lang.Nullable; 035import org.springframework.util.Assert; 036import org.springframework.web.reactive.socket.CloseStatus; 037import org.springframework.web.reactive.socket.HandshakeInfo; 038import org.springframework.web.reactive.socket.WebSocketMessage; 039import org.springframework.web.reactive.socket.WebSocketMessage.Type; 040import org.springframework.web.reactive.socket.WebSocketSession; 041 042/** 043 * Base class for {@link WebSocketSession} implementations that bridge between 044 * event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty, 045 * Undertow) and Reactive Streams. 046 * 047 * <p>Also an implementation of {@code Subscriber<Void>} so it can be used as 048 * the completion subscriber for session handling 049 * 050 * @author Violeta Georgieva 051 * @author Rossen Stoyanchev 052 * @since 5.0 053 * @param <T> the native delegate type 054 */ 055public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> 056 implements Subscriber<Void> { 057 058 /** 059 * The "back-pressure" buffer size to use if the underlying WebSocket API 060 * does not have flow control for receiving messages. 061 */ 062 private static final int RECEIVE_BUFFER_SIZE = 8192; 063 064 065 @Nullable 066 private final MonoProcessor<Void> completionMono; 067 068 private final WebSocketReceivePublisher receivePublisher; 069 070 @Nullable 071 private volatile WebSocketSendProcessor sendProcessor; 072 073 private final AtomicBoolean sendCalled = new AtomicBoolean(); 074 075 076 /** 077 * Base constructor. 078 * @param delegate the native WebSocket session, channel, or connection 079 * @param id the session id 080 * @param handshakeInfo the handshake info 081 * @param bufferFactory the DataBuffer factor for the current connection 082 */ 083 public AbstractListenerWebSocketSession( 084 T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory) { 085 086 this(delegate, id, handshakeInfo, bufferFactory, null); 087 } 088 089 /** 090 * Alternative constructor with completion {@code Mono<Void>} to propagate 091 * the session completion (success or error) (for client-side use). 092 */ 093 public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, 094 DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) { 095 096 super(delegate, id, info, bufferFactory); 097 this.receivePublisher = new WebSocketReceivePublisher(); 098 this.completionMono = completionMono; 099 } 100 101 102 protected WebSocketSendProcessor getSendProcessor() { 103 WebSocketSendProcessor sendProcessor = this.sendProcessor; 104 Assert.state(sendProcessor != null, "No WebSocketSendProcessor available"); 105 return sendProcessor; 106 } 107 108 @Override 109 public Flux<WebSocketMessage> receive() { 110 return (canSuspendReceiving() ? Flux.from(this.receivePublisher) : 111 Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE)); 112 } 113 114 @Override 115 public Mono<Void> send(Publisher<WebSocketMessage> messages) { 116 if (this.sendCalled.compareAndSet(false, true)) { 117 WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor(); 118 this.sendProcessor = sendProcessor; 119 return Mono.from(subscriber -> { 120 messages.subscribe(sendProcessor); 121 sendProcessor.subscribe(subscriber); 122 }); 123 } 124 else { 125 return Mono.error(new IllegalStateException("send() has already been called")); 126 } 127 } 128 129 /** 130 * Whether the underlying WebSocket API has flow control and can suspend and 131 * resume the receiving of messages. 132 * <p><strong>Note:</strong> Sub-classes are encouraged to start out in 133 * suspended mode, if possible, and wait until demand is received. 134 */ 135 protected abstract boolean canSuspendReceiving(); 136 137 /** 138 * Suspend receiving until received message(s) are processed and more demand 139 * is generated by the downstream Subscriber. 140 * <p><strong>Note:</strong> if the underlying WebSocket API does not provide 141 * flow control for receiving messages, this method should be a no-op 142 * and {@link #canSuspendReceiving()} should return {@code false}. 143 */ 144 protected abstract void suspendReceiving(); 145 146 /** 147 * Resume receiving new message(s) after demand is generated by the 148 * downstream Subscriber. 149 * <p><strong>Note:</strong> if the underlying WebSocket API does not provide 150 * flow control for receiving messages, this method should be a no-op 151 * and {@link #canSuspendReceiving()} should return {@code false}. 152 */ 153 protected abstract void resumeReceiving(); 154 155 /** 156 * Send the given WebSocket message. 157 * <p><strong>Note:</strong> Sub-classes are responsible for releasing the 158 * payload data buffer, once fully written, if pooled buffers apply to the 159 * underlying container. 160 */ 161 protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; 162 163 164 // WebSocketHandler adapter delegate methods 165 166 /** Handle a message callback from the WebSocketHandler adapter. */ 167 void handleMessage(Type type, WebSocketMessage message) { 168 this.receivePublisher.handleMessage(message); 169 } 170 171 /** Handle an error callback from the WebSocketHandler adapter. */ 172 void handleError(Throwable ex) { 173 this.receivePublisher.onError(ex); 174 WebSocketSendProcessor sendProcessor = this.sendProcessor; 175 if (sendProcessor != null) { 176 sendProcessor.cancel(); 177 sendProcessor.onError(ex); 178 } 179 } 180 181 /** Handle a close callback from the WebSocketHandler adapter. */ 182 void handleClose(CloseStatus reason) { 183 this.receivePublisher.onAllDataRead(); 184 WebSocketSendProcessor sendProcessor = this.sendProcessor; 185 if (sendProcessor != null) { 186 sendProcessor.cancel(); 187 sendProcessor.onComplete(); 188 } 189 } 190 191 192 // Subscriber<Void> implementation 193 194 @Override 195 public void onSubscribe(Subscription subscription) { 196 subscription.request(Long.MAX_VALUE); 197 } 198 199 @Override 200 public void onNext(Void aVoid) { 201 // no op 202 } 203 204 @Override 205 public void onError(Throwable ex) { 206 if (this.completionMono != null) { 207 this.completionMono.onError(ex); 208 } 209 int code = CloseStatus.SERVER_ERROR.getCode(); 210 close(new CloseStatus(code, ex.getMessage())); 211 } 212 213 @Override 214 public void onComplete() { 215 if (this.completionMono != null) { 216 this.completionMono.onComplete(); 217 } 218 close(); 219 } 220 221 222 private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> { 223 224 private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get(); 225 226 227 WebSocketReceivePublisher() { 228 super(AbstractListenerWebSocketSession.this.getLogPrefix()); 229 } 230 231 232 @Override 233 protected void checkOnDataAvailable() { 234 resumeReceiving(); 235 int size = this.pendingMessages.size(); 236 if (rsReadLogger.isTraceEnabled()) { 237 rsReadLogger.trace(getLogPrefix() + "checkOnDataAvailable (" + size + " pending)"); 238 } 239 if (size > 0) { 240 onDataAvailable(); 241 } 242 } 243 244 @Override 245 protected void readingPaused() { 246 suspendReceiving(); 247 } 248 249 @Override 250 @Nullable 251 protected WebSocketMessage read() throws IOException { 252 return (WebSocketMessage) this.pendingMessages.poll(); 253 } 254 255 void handleMessage(WebSocketMessage message) { 256 if (logger.isTraceEnabled()) { 257 logger.trace(getLogPrefix() + "Received " + message); 258 } 259 else if (rsReadLogger.isTraceEnabled()) { 260 rsReadLogger.trace(getLogPrefix() + "Received " + message); 261 } 262 if (!this.pendingMessages.offer(message)) { 263 discardData(); 264 throw new IllegalStateException( 265 "Too many messages. Please ensure WebSocketSession.receive() is subscribed to."); 266 } 267 onDataAvailable(); 268 } 269 270 @Override 271 protected void discardData() { 272 while (true) { 273 WebSocketMessage message = (WebSocketMessage) this.pendingMessages.poll(); 274 if (message == null) { 275 return; 276 } 277 message.release(); 278 } 279 } 280 } 281 282 283 /** 284 * Processor to send web socket messages. 285 */ 286 protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> { 287 288 private volatile boolean isReady = true; 289 290 291 WebSocketSendProcessor() { 292 super(receivePublisher.getLogPrefix()); 293 } 294 295 296 @Override 297 protected boolean write(WebSocketMessage message) throws IOException { 298 if (logger.isTraceEnabled()) { 299 logger.trace(getLogPrefix() + "Sending " + message); 300 } 301 else if (rsWriteLogger.isTraceEnabled()) { 302 rsWriteLogger.trace(getLogPrefix() + "Sending " + message); 303 } 304 // In case of IOException, onError handling should call discardData(WebSocketMessage).. 305 return sendMessage(message); 306 } 307 308 @Override 309 protected boolean isDataEmpty(WebSocketMessage message) { 310 return (message.getPayload().readableByteCount() == 0); 311 } 312 313 @Override 314 protected boolean isWritePossible() { 315 return (this.isReady); 316 } 317 318 /** 319 * Sub-classes can invoke this before sending a message (false) and 320 * after receiving the async send callback (true) effective translating 321 * async completion callback into simple flow control. 322 */ 323 public void setReadyToSend(boolean ready) { 324 if (ready && rsWriteLogger.isTraceEnabled()) { 325 rsWriteLogger.trace(getLogPrefix() + "Ready to send"); 326 } 327 this.isReady = ready; 328 } 329 330 @Override 331 protected void discardData(WebSocketMessage message) { 332 message.release(); 333 } 334 } 335 336}