001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.server.upgrade; 018 019import java.net.URI; 020import java.util.function.Supplier; 021 022import reactor.core.publisher.Mono; 023import reactor.netty.http.server.HttpServerResponse; 024import reactor.netty.http.server.WebsocketServerSpec; 025 026import org.springframework.core.io.buffer.NettyDataBufferFactory; 027import org.springframework.http.server.reactive.AbstractServerHttpResponse; 028import org.springframework.http.server.reactive.ServerHttpResponse; 029import org.springframework.http.server.reactive.ServerHttpResponseDecorator; 030import org.springframework.lang.Nullable; 031import org.springframework.util.Assert; 032import org.springframework.web.reactive.socket.HandshakeInfo; 033import org.springframework.web.reactive.socket.WebSocketHandler; 034import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; 035import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; 036import org.springframework.web.server.ServerWebExchange; 037 038/** 039 * A {@link RequestUpgradeStrategy} for use with Reactor Netty. 040 * 041 * @author Rossen Stoyanchev 042 * @since 5.0 043 */ 044public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { 045 046 private final Supplier<WebsocketServerSpec.Builder> specBuilderSupplier; 047 048 @Nullable 049 private Integer maxFramePayloadLength; 050 051 @Nullable 052 private Boolean handlePing; 053 054 055 /** 056 * Create an instances with a default {@link WebsocketServerSpec.Builder}. 057 * @since 5.2.6 058 */ 059 public ReactorNettyRequestUpgradeStrategy() { 060 this(WebsocketServerSpec::builder); 061 } 062 063 064 /** 065 * Create an instance with a pre-configured {@link WebsocketServerSpec.Builder} 066 * to use for WebSocket upgrades. 067 * @since 5.2.6 068 */ 069 public ReactorNettyRequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) { 070 Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required"); 071 this.specBuilderSupplier = builderSupplier; 072 } 073 074 075 /** 076 * Build an instance of {@code WebsocketServerSpec} that reflects the current 077 * configuration. This can be used to check the configured parameters except 078 * for sub-protocols which depend on the {@link WebSocketHandler} that is used 079 * for a given upgrade. 080 * @since 5.2.6 081 */ 082 public WebsocketServerSpec getWebsocketServerSpec() { 083 return buildSpec(null); 084 } 085 086 WebsocketServerSpec buildSpec(@Nullable String subProtocol) { 087 WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get(); 088 if (subProtocol != null) { 089 builder.protocols(subProtocol); 090 } 091 if (this.maxFramePayloadLength != null) { 092 builder.maxFramePayloadLength(this.maxFramePayloadLength); 093 } 094 if (this.handlePing != null) { 095 builder.handlePing(this.handlePing); 096 } 097 return builder.build(); 098 } 099 100 /** 101 * Configure the maximum allowable frame payload length. Setting this value 102 * to your application's requirement may reduce denial of service attacks 103 * using long data frames. 104 * <p>Corresponds to the argument with the same name in the constructor of 105 * {@link io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory 106 * WebSocketServerHandshakerFactory} in Netty. 107 * <p>By default set to 65536 (64K). 108 * @param maxFramePayloadLength the max length for frames. 109 * @since 5.1 110 * @deprecated as of 5.2.6 in favor of providing a supplier of 111 * {@link WebsocketServerSpec.Builder} wiht a constructor argument. 112 */ 113 @Deprecated 114 public void setMaxFramePayloadLength(Integer maxFramePayloadLength) { 115 this.maxFramePayloadLength = maxFramePayloadLength; 116 } 117 118 /** 119 * Return the configured max length for frames. 120 * @since 5.1 121 * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} 122 */ 123 @Deprecated 124 public int getMaxFramePayloadLength() { 125 return getWebsocketServerSpec().maxFramePayloadLength(); 126 } 127 128 /** 129 * Configure whether to let ping frames through to be handled by the 130 * {@link WebSocketHandler} given to the upgrade method. By default, Reactor 131 * Netty automatically replies with pong frames in response to pings. This is 132 * useful in a proxy for allowing ping and pong frames through. 133 * <p>By default this is set to {@code false} in which case ping frames are 134 * handled automatically by Reactor Netty. If set to {@code true}, ping 135 * frames will be passed through to the {@link WebSocketHandler}. 136 * @param handlePing whether to let Ping frames through for handling 137 * @since 5.2.4 138 * @deprecated as of 5.2.6 in favor of providing a supplier of 139 * {@link WebsocketServerSpec.Builder} wiht a constructor argument. 140 */ 141 @Deprecated 142 public void setHandlePing(boolean handlePing) { 143 this.handlePing = handlePing; 144 } 145 146 /** 147 * Return the configured {@link #setHandlePing(boolean)}. 148 * @since 5.2.4 149 * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} 150 */ 151 @Deprecated 152 public boolean getHandlePing() { 153 return getWebsocketServerSpec().handlePing(); 154 } 155 156 157 @Override 158 public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, 159 @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) { 160 161 ServerHttpResponse response = exchange.getResponse(); 162 HttpServerResponse reactorResponse = getNativeResponse(response); 163 HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); 164 NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); 165 URI uri = exchange.getRequest().getURI(); 166 167 // Trigger WebFlux preCommit actions and upgrade 168 return response.setComplete() 169 .then(Mono.defer(() -> { 170 WebsocketServerSpec spec = buildSpec(subProtocol); 171 return reactorResponse.sendWebsocket((in, out) -> { 172 ReactorNettyWebSocketSession session = 173 new ReactorNettyWebSocketSession( 174 in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength()); 175 return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]"); 176 }, spec); 177 })); 178 } 179 180 private static HttpServerResponse getNativeResponse(ServerHttpResponse response) { 181 if (response instanceof AbstractServerHttpResponse) { 182 return ((AbstractServerHttpResponse) response).getNativeResponse(); 183 } 184 else if (response instanceof ServerHttpResponseDecorator) { 185 return getNativeResponse(((ServerHttpResponseDecorator) response).getDelegate()); 186 } 187 else { 188 throw new IllegalArgumentException( 189 "Couldn't find native response in " + response.getClass().getName()); 190 } 191 } 192 193}