001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.web.reactive.socket.adapter; 017 018import io.netty.handler.codec.http.websocketx.WebSocketFrame; 019import org.reactivestreams.Publisher; 020import reactor.core.publisher.Flux; 021import reactor.core.publisher.Mono; 022import reactor.netty.NettyInbound; 023import reactor.netty.NettyOutbound; 024import reactor.netty.http.websocket.WebsocketInbound; 025import reactor.netty.http.websocket.WebsocketOutbound; 026 027import org.springframework.core.io.buffer.NettyDataBufferFactory; 028import org.springframework.web.reactive.socket.CloseStatus; 029import org.springframework.web.reactive.socket.HandshakeInfo; 030import org.springframework.web.reactive.socket.WebSocketMessage; 031import org.springframework.web.reactive.socket.WebSocketSession; 032 033 034/** 035 * {@link WebSocketSession} implementation for use with the Reactor Netty's 036 * {@link NettyInbound} and {@link NettyOutbound}. 037 * 038 * @author Rossen Stoyanchev 039 * @since 5.0 040 */ 041public class ReactorNettyWebSocketSession 042 extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> { 043 044 private final int maxFramePayloadLength; 045 046 047 /** 048 * Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value. 049 */ 050 public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, 051 HandshakeInfo info, NettyDataBufferFactory bufferFactory) { 052 053 this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE); 054 } 055 056 /** 057 * Constructor with an additional maxFramePayloadLength argument. 058 * @since 5.1 059 */ 060 public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, 061 HandshakeInfo info, NettyDataBufferFactory bufferFactory, 062 int maxFramePayloadLength) { 063 064 super(new WebSocketConnection(inbound, outbound), info, bufferFactory); 065 this.maxFramePayloadLength = maxFramePayloadLength; 066 } 067 068 069 @Override 070 public Flux<WebSocketMessage> receive() { 071 return getDelegate().getInbound() 072 .aggregateFrames(this.maxFramePayloadLength) 073 .receiveFrames() 074 .map(super::toMessage) 075 .doOnNext(message -> { 076 if (logger.isTraceEnabled()) { 077 logger.trace(getLogPrefix() + "Received " + message); 078 } 079 }); 080 } 081 082 @Override 083 public Mono<Void> send(Publisher<WebSocketMessage> messages) { 084 Flux<WebSocketFrame> frames = Flux.from(messages) 085 .doOnNext(message -> { 086 if (logger.isTraceEnabled()) { 087 logger.trace(getLogPrefix() + "Sending " + message); 088 } 089 }) 090 .map(this::toFrame); 091 return getDelegate().getOutbound() 092 .sendObject(frames) 093 .then(); 094 } 095 096 @Override 097 public Mono<Void> close(CloseStatus status) { 098 return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason()); 099 } 100 101 102 /** 103 * Simple container for {@link NettyInbound} and {@link NettyOutbound}. 104 */ 105 public static class WebSocketConnection { 106 107 private final WebsocketInbound inbound; 108 109 private final WebsocketOutbound outbound; 110 111 112 public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) { 113 this.inbound = inbound; 114 this.outbound = outbound; 115 } 116 117 public WebsocketInbound getInbound() { 118 return this.inbound; 119 } 120 121 public WebsocketOutbound getOutbound() { 122 return this.outbound; 123 } 124 } 125 126}