001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.server.upgrade;
018
019import java.net.URI;
020import java.util.function.Supplier;
021
022import reactor.core.publisher.Mono;
023import reactor.netty.http.server.HttpServerResponse;
024import reactor.netty.http.server.WebsocketServerSpec;
025
026import org.springframework.core.io.buffer.NettyDataBufferFactory;
027import org.springframework.http.server.reactive.AbstractServerHttpResponse;
028import org.springframework.http.server.reactive.ServerHttpResponse;
029import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
030import org.springframework.lang.Nullable;
031import org.springframework.util.Assert;
032import org.springframework.web.reactive.socket.HandshakeInfo;
033import org.springframework.web.reactive.socket.WebSocketHandler;
034import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
035import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
036import org.springframework.web.server.ServerWebExchange;
037
038/**
039 * A {@link RequestUpgradeStrategy} for use with Reactor Netty.
040 *
041 * @author Rossen Stoyanchev
042 * @since 5.0
043 */
044public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
045
046        private final Supplier<WebsocketServerSpec.Builder> specBuilderSupplier;
047
048        @Nullable
049        private Integer maxFramePayloadLength;
050
051        @Nullable
052        private Boolean handlePing;
053
054
055        /**
056         * Create an instances with a default {@link WebsocketServerSpec.Builder}.
057         * @since 5.2.6
058         */
059        public ReactorNettyRequestUpgradeStrategy() {
060                this(WebsocketServerSpec::builder);
061        }
062
063
064        /**
065         * Create an instance with a pre-configured {@link WebsocketServerSpec.Builder}
066         * to use for WebSocket upgrades.
067         * @since 5.2.6
068         */
069        public ReactorNettyRequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) {
070                Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required");
071                this.specBuilderSupplier = builderSupplier;
072        }
073
074
075        /**
076         * Build an instance of {@code WebsocketServerSpec} that reflects the current
077         * configuration. This can be used to check the configured parameters except
078         * for sub-protocols which depend on the {@link WebSocketHandler} that is used
079         * for a given upgrade.
080         * @since 5.2.6
081         */
082        public WebsocketServerSpec getWebsocketServerSpec() {
083                return buildSpec(null);
084        }
085
086        WebsocketServerSpec buildSpec(@Nullable String subProtocol) {
087                WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get();
088                if (subProtocol != null) {
089                        builder.protocols(subProtocol);
090                }
091                if (this.maxFramePayloadLength != null) {
092                        builder.maxFramePayloadLength(this.maxFramePayloadLength);
093                }
094                if (this.handlePing != null) {
095                        builder.handlePing(this.handlePing);
096                }
097                return builder.build();
098        }
099
100        /**
101         * Configure the maximum allowable frame payload length. Setting this value
102         * to your application's requirement may reduce denial of service attacks
103         * using long data frames.
104         * <p>Corresponds to the argument with the same name in the constructor of
105         * {@link io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
106         * WebSocketServerHandshakerFactory} in Netty.
107         * <p>By default set to 65536 (64K).
108         * @param maxFramePayloadLength the max length for frames.
109         * @since 5.1
110         * @deprecated as of 5.2.6 in favor of providing a supplier of
111         * {@link WebsocketServerSpec.Builder} wiht a constructor argument.
112         */
113        @Deprecated
114        public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
115                this.maxFramePayloadLength = maxFramePayloadLength;
116        }
117
118        /**
119         * Return the configured max length for frames.
120         * @since 5.1
121         * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
122         */
123        @Deprecated
124        public int getMaxFramePayloadLength() {
125                return getWebsocketServerSpec().maxFramePayloadLength();
126        }
127
128        /**
129         * Configure whether to let ping frames through to be handled by the
130         * {@link WebSocketHandler} given to the upgrade method. By default, Reactor
131         * Netty automatically replies with pong frames in response to pings. This is
132         * useful in a proxy for allowing ping and pong frames through.
133         * <p>By default this is set to {@code false} in which case ping frames are
134         * handled automatically by Reactor Netty. If set to {@code true}, ping
135         * frames will be passed through to the {@link WebSocketHandler}.
136         * @param handlePing whether to let Ping frames through for handling
137         * @since 5.2.4
138         * @deprecated as of 5.2.6 in favor of providing a supplier of
139         * {@link WebsocketServerSpec.Builder} wiht a constructor argument.
140         */
141        @Deprecated
142        public void setHandlePing(boolean handlePing) {
143                this.handlePing = handlePing;
144        }
145
146        /**
147         * Return the configured {@link #setHandlePing(boolean)}.
148         * @since 5.2.4
149         * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
150         */
151        @Deprecated
152        public boolean getHandlePing() {
153                return getWebsocketServerSpec().handlePing();
154        }
155
156
157        @Override
158        public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
159                        @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
160
161                ServerHttpResponse response = exchange.getResponse();
162                HttpServerResponse reactorResponse = getNativeResponse(response);
163                HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
164                NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
165                URI uri = exchange.getRequest().getURI();
166
167                // Trigger WebFlux preCommit actions and upgrade
168                return response.setComplete()
169                                .then(Mono.defer(() -> {
170                                        WebsocketServerSpec spec = buildSpec(subProtocol);
171                                        return reactorResponse.sendWebsocket((in, out) -> {
172                                                ReactorNettyWebSocketSession session =
173                                                                new ReactorNettyWebSocketSession(
174                                                                                in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength());
175                                                return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
176                                        }, spec);
177                                }));
178        }
179
180        private static HttpServerResponse getNativeResponse(ServerHttpResponse response) {
181                if (response instanceof AbstractServerHttpResponse) {
182                        return ((AbstractServerHttpResponse) response).getNativeResponse();
183                }
184                else if (response instanceof ServerHttpResponseDecorator) {
185                        return getNativeResponse(((ServerHttpResponseDecorator) response).getDelegate());
186                }
187                else {
188                        throw new IllegalArgumentException(
189                                        "Couldn't find native response in " + response.getClass().getName());
190                }
191        }
192
193}