001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.adapter;
018
019import java.io.IOException;
020import java.util.Queue;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.reactivestreams.Publisher;
024import org.reactivestreams.Subscriber;
025import org.reactivestreams.Subscription;
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028import reactor.core.publisher.MonoProcessor;
029import reactor.util.concurrent.Queues;
030
031import org.springframework.core.io.buffer.DataBufferFactory;
032import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
033import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
034import org.springframework.lang.Nullable;
035import org.springframework.util.Assert;
036import org.springframework.web.reactive.socket.CloseStatus;
037import org.springframework.web.reactive.socket.HandshakeInfo;
038import org.springframework.web.reactive.socket.WebSocketMessage;
039import org.springframework.web.reactive.socket.WebSocketMessage.Type;
040import org.springframework.web.reactive.socket.WebSocketSession;
041
042/**
043 * Base class for {@link WebSocketSession} implementations that bridge between
044 * event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty,
045 * Undertow) and Reactive Streams.
046 *
047 * <p>Also an implementation of {@code Subscriber&lt;Void&gt;} so it can be used as
048 * the completion subscriber for session handling
049 *
050 * @author Violeta Georgieva
051 * @author Rossen Stoyanchev
052 * @since 5.0
053 * @param <T> the native delegate type
054 */
055public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T>
056                implements Subscriber<Void> {
057
058        /**
059         * The "back-pressure" buffer size to use if the underlying WebSocket API
060         * does not have flow control for receiving messages.
061         */
062        private static final int RECEIVE_BUFFER_SIZE = 8192;
063
064
065        @Nullable
066        private final MonoProcessor<Void> completionMono;
067
068        private final WebSocketReceivePublisher receivePublisher;
069
070        @Nullable
071        private volatile WebSocketSendProcessor sendProcessor;
072
073        private final AtomicBoolean sendCalled = new AtomicBoolean();
074
075
076        /**
077         * Base constructor.
078         * @param delegate the native WebSocket session, channel, or connection
079         * @param id the session id
080         * @param handshakeInfo the handshake info
081         * @param bufferFactory the DataBuffer factor for the current connection
082         */
083        public AbstractListenerWebSocketSession(
084                        T delegate, String id, HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory) {
085
086                this(delegate, id, handshakeInfo, bufferFactory, null);
087        }
088
089        /**
090         * Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate
091         * the session completion (success or error) (for client-side use).
092         */
093        public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
094                        DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {
095
096                super(delegate, id, info, bufferFactory);
097                this.receivePublisher = new WebSocketReceivePublisher();
098                this.completionMono = completionMono;
099        }
100
101
102        protected WebSocketSendProcessor getSendProcessor() {
103                WebSocketSendProcessor sendProcessor = this.sendProcessor;
104                Assert.state(sendProcessor != null, "No WebSocketSendProcessor available");
105                return sendProcessor;
106        }
107
108        @Override
109        public Flux<WebSocketMessage> receive() {
110                return (canSuspendReceiving() ? Flux.from(this.receivePublisher) :
111                                Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE));
112        }
113
114        @Override
115        public Mono<Void> send(Publisher<WebSocketMessage> messages) {
116                if (this.sendCalled.compareAndSet(false, true)) {
117                        WebSocketSendProcessor sendProcessor = new WebSocketSendProcessor();
118                        this.sendProcessor = sendProcessor;
119                        return Mono.from(subscriber -> {
120                                        messages.subscribe(sendProcessor);
121                                        sendProcessor.subscribe(subscriber);
122                        });
123                }
124                else {
125                        return Mono.error(new IllegalStateException("send() has already been called"));
126                }
127        }
128
129        /**
130         * Whether the underlying WebSocket API has flow control and can suspend and
131         * resume the receiving of messages.
132         * <p><strong>Note:</strong> Sub-classes are encouraged to start out in
133         * suspended mode, if possible, and wait until demand is received.
134         */
135        protected abstract boolean canSuspendReceiving();
136
137        /**
138         * Suspend receiving until received message(s) are processed and more demand
139         * is generated by the downstream Subscriber.
140         * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
141         * flow control for receiving messages, this method should be a no-op
142         * and {@link #canSuspendReceiving()} should return {@code false}.
143         */
144        protected abstract void suspendReceiving();
145
146        /**
147         * Resume receiving new message(s) after demand is generated by the
148         * downstream Subscriber.
149         * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
150         * flow control for receiving messages, this method should be a no-op
151         * and {@link #canSuspendReceiving()} should return {@code false}.
152         */
153        protected abstract void resumeReceiving();
154
155        /**
156         * Send the given WebSocket message.
157         * <p><strong>Note:</strong> Sub-classes are responsible for releasing the
158         * payload data buffer, once fully written, if pooled buffers apply to the
159         * underlying container.
160         */
161        protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
162
163
164        // WebSocketHandler adapter delegate methods
165
166        /** Handle a message callback from the WebSocketHandler adapter. */
167        void handleMessage(Type type, WebSocketMessage message) {
168                this.receivePublisher.handleMessage(message);
169        }
170
171        /** Handle an error callback from the WebSocketHandler adapter. */
172        void handleError(Throwable ex) {
173                this.receivePublisher.onError(ex);
174                WebSocketSendProcessor sendProcessor = this.sendProcessor;
175                if (sendProcessor != null) {
176                        sendProcessor.cancel();
177                        sendProcessor.onError(ex);
178                }
179        }
180
181        /** Handle a close callback from the WebSocketHandler adapter. */
182        void handleClose(CloseStatus reason) {
183                this.receivePublisher.onAllDataRead();
184                WebSocketSendProcessor sendProcessor = this.sendProcessor;
185                if (sendProcessor != null) {
186                        sendProcessor.cancel();
187                        sendProcessor.onComplete();
188                }
189        }
190
191
192        // Subscriber<Void> implementation
193
194        @Override
195        public void onSubscribe(Subscription subscription) {
196                subscription.request(Long.MAX_VALUE);
197        }
198
199        @Override
200        public void onNext(Void aVoid) {
201                // no op
202        }
203
204        @Override
205        public void onError(Throwable ex) {
206                if (this.completionMono != null) {
207                        this.completionMono.onError(ex);
208                }
209                int code = CloseStatus.SERVER_ERROR.getCode();
210                close(new CloseStatus(code, ex.getMessage()));
211        }
212
213        @Override
214        public void onComplete() {
215                if (this.completionMono != null) {
216                        this.completionMono.onComplete();
217                }
218                close();
219        }
220
221
222        private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
223
224                private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
225
226
227                WebSocketReceivePublisher() {
228                        super(AbstractListenerWebSocketSession.this.getLogPrefix());
229                }
230
231
232                @Override
233                protected void checkOnDataAvailable() {
234                        resumeReceiving();
235                        int size = this.pendingMessages.size();
236                        if (rsReadLogger.isTraceEnabled()) {
237                                rsReadLogger.trace(getLogPrefix() + "checkOnDataAvailable (" + size + " pending)");
238                        }
239                        if (size > 0) {
240                                onDataAvailable();
241                        }
242                }
243
244                @Override
245                protected void readingPaused() {
246                        suspendReceiving();
247                }
248
249                @Override
250                @Nullable
251                protected WebSocketMessage read() throws IOException {
252                        return (WebSocketMessage) this.pendingMessages.poll();
253                }
254
255                void handleMessage(WebSocketMessage message) {
256                        if (logger.isTraceEnabled()) {
257                                logger.trace(getLogPrefix() + "Received " + message);
258                        }
259                        else if (rsReadLogger.isTraceEnabled()) {
260                                rsReadLogger.trace(getLogPrefix() + "Received " + message);
261                        }
262                        if (!this.pendingMessages.offer(message)) {
263                                discardData();
264                                throw new IllegalStateException(
265                                                "Too many messages. Please ensure WebSocketSession.receive() is subscribed to.");
266                        }
267                        onDataAvailable();
268                }
269
270                @Override
271                protected void discardData() {
272                        while (true) {
273                                WebSocketMessage message = (WebSocketMessage) this.pendingMessages.poll();
274                                if (message == null) {
275                                        return;
276                                }
277                                message.release();
278                        }
279                }
280        }
281
282
283        /**
284         * Processor to send web socket messages.
285         */
286        protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {
287
288                private volatile boolean isReady = true;
289
290
291                WebSocketSendProcessor() {
292                        super(receivePublisher.getLogPrefix());
293                }
294
295
296                @Override
297                protected boolean write(WebSocketMessage message) throws IOException {
298                        if (logger.isTraceEnabled()) {
299                                logger.trace(getLogPrefix() + "Sending " + message);
300                        }
301                        else if (rsWriteLogger.isTraceEnabled()) {
302                                rsWriteLogger.trace(getLogPrefix() + "Sending " + message);
303                        }
304                        // In case of IOException, onError handling should call discardData(WebSocketMessage)..
305                        return sendMessage(message);
306                }
307
308                @Override
309                protected boolean isDataEmpty(WebSocketMessage message) {
310                        return (message.getPayload().readableByteCount() == 0);
311                }
312
313                @Override
314                protected boolean isWritePossible() {
315                        return (this.isReady);
316                }
317
318                /**
319                 * Sub-classes can invoke this before sending a message (false) and
320                 * after receiving the async send callback (true) effective translating
321                 * async completion callback into simple flow control.
322                 */
323                public void setReadyToSend(boolean ready) {
324                        if (ready && rsWriteLogger.isTraceEnabled()) {
325                                rsWriteLogger.trace(getLogPrefix() + "Ready to send");
326                        }
327                        this.isReady = ready;
328                }
329
330                @Override
331                protected void discardData(WebSocketMessage message) {
332                        message.release();
333                }
334        }
335
336}