001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.web.reactive.socket.adapter;
017
018import io.netty.handler.codec.http.websocketx.WebSocketFrame;
019import org.reactivestreams.Publisher;
020import reactor.core.publisher.Flux;
021import reactor.core.publisher.Mono;
022import reactor.netty.NettyInbound;
023import reactor.netty.NettyOutbound;
024import reactor.netty.http.websocket.WebsocketInbound;
025import reactor.netty.http.websocket.WebsocketOutbound;
026
027import org.springframework.core.io.buffer.NettyDataBufferFactory;
028import org.springframework.web.reactive.socket.CloseStatus;
029import org.springframework.web.reactive.socket.HandshakeInfo;
030import org.springframework.web.reactive.socket.WebSocketMessage;
031import org.springframework.web.reactive.socket.WebSocketSession;
032
033
034/**
035 * {@link WebSocketSession} implementation for use with the Reactor Netty's
036 * {@link NettyInbound} and {@link NettyOutbound}.
037 *
038 * @author Rossen Stoyanchev
039 * @since 5.0
040 */
041public class ReactorNettyWebSocketSession
042                extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
043
044        private final int maxFramePayloadLength;
045
046
047        /**
048         * Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value.
049         */
050        public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
051                        HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
052
053                this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE);
054        }
055
056        /**
057         * Constructor with an additional maxFramePayloadLength argument.
058         * @since 5.1
059         */
060        public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
061                        HandshakeInfo info, NettyDataBufferFactory bufferFactory,
062                        int maxFramePayloadLength) {
063
064                super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
065                this.maxFramePayloadLength = maxFramePayloadLength;
066        }
067
068
069        @Override
070        public Flux<WebSocketMessage> receive() {
071                return getDelegate().getInbound()
072                                .aggregateFrames(this.maxFramePayloadLength)
073                                .receiveFrames()
074                                .map(super::toMessage)
075                                .doOnNext(message -> {
076                                        if (logger.isTraceEnabled()) {
077                                                logger.trace(getLogPrefix() + "Received " + message);
078                                        }
079                                });
080        }
081
082        @Override
083        public Mono<Void> send(Publisher<WebSocketMessage> messages) {
084                Flux<WebSocketFrame> frames = Flux.from(messages)
085                                .doOnNext(message -> {
086                                        if (logger.isTraceEnabled()) {
087                                                logger.trace(getLogPrefix() + "Sending " + message);
088                                        }
089                                })
090                                .map(this::toFrame);
091                return getDelegate().getOutbound()
092                                .sendObject(frames)
093                                .then();
094        }
095
096        @Override
097        public Mono<Void> close(CloseStatus status) {
098                return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason());
099        }
100
101
102        /**
103         * Simple container for {@link NettyInbound} and {@link NettyOutbound}.
104         */
105        public static class WebSocketConnection {
106
107                private final WebsocketInbound inbound;
108
109                private final WebsocketOutbound outbound;
110
111
112                public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
113                        this.inbound = inbound;
114                        this.outbound = outbound;
115                }
116
117                public WebsocketInbound getInbound() {
118                        return this.inbound;
119                }
120
121                public WebsocketOutbound getOutbound() {
122                        return this.outbound;
123                }
124        }
125
126}