001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.adapter;
018
019import java.nio.ByteBuffer;
020import java.nio.charset.StandardCharsets;
021import java.util.function.Function;
022
023import org.eclipse.jetty.websocket.api.Session;
024import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
025import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
026import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
027import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
028import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
029import org.eclipse.jetty.websocket.api.annotations.WebSocket;
030import org.eclipse.jetty.websocket.api.extensions.Frame;
031import org.eclipse.jetty.websocket.common.OpCode;
032
033import org.springframework.core.io.buffer.DataBuffer;
034import org.springframework.lang.Nullable;
035import org.springframework.util.Assert;
036import org.springframework.web.reactive.socket.CloseStatus;
037import org.springframework.web.reactive.socket.WebSocketHandler;
038import org.springframework.web.reactive.socket.WebSocketMessage;
039import org.springframework.web.reactive.socket.WebSocketMessage.Type;
040import org.springframework.web.reactive.socket.WebSocketSession;
041
042/**
043 * Jetty {@link WebSocket @WebSocket} handler that delegates events to a
044 * reactive {@link WebSocketHandler} and its session.
045 *
046 * @author Violeta Georgieva
047 * @author Rossen Stoyanchev
048 * @since 5.0
049 */
050@WebSocket
051public class JettyWebSocketHandlerAdapter {
052
053        private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
054
055
056        private final WebSocketHandler delegateHandler;
057
058        private final Function<Session, JettyWebSocketSession> sessionFactory;
059
060        @Nullable
061        private JettyWebSocketSession delegateSession;
062
063
064        public JettyWebSocketHandlerAdapter(WebSocketHandler handler,
065                        Function<Session, JettyWebSocketSession> sessionFactory) {
066
067                Assert.notNull(handler, "WebSocketHandler is required");
068                Assert.notNull(sessionFactory, "'sessionFactory' is required");
069                this.delegateHandler = handler;
070                this.sessionFactory = sessionFactory;
071        }
072
073
074        @OnWebSocketConnect
075        public void onWebSocketConnect(Session session) {
076                this.delegateSession = this.sessionFactory.apply(session);
077                this.delegateHandler.handle(this.delegateSession)
078                                .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]")
079                                .subscribe(this.delegateSession);
080        }
081
082        @OnWebSocketMessage
083        public void onWebSocketText(String message) {
084                if (this.delegateSession != null) {
085                        WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
086                        this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
087                }
088        }
089
090        @OnWebSocketMessage
091        public void onWebSocketBinary(byte[] message, int offset, int length) {
092                if (this.delegateSession != null) {
093                        ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
094                        WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
095                        this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
096                }
097        }
098
099        @OnWebSocketFrame
100        public void onWebSocketFrame(Frame frame) {
101                if (this.delegateSession != null) {
102                        if (OpCode.PONG == frame.getOpCode()) {
103                                ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
104                                WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
105                                this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
106                        }
107                }
108        }
109
110        private <T> WebSocketMessage toMessage(Type type, T message) {
111                WebSocketSession session = this.delegateSession;
112                Assert.state(session != null, "Cannot create message without a session");
113                if (Type.TEXT.equals(type)) {
114                        byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
115                        DataBuffer buffer = session.bufferFactory().wrap(bytes);
116                        return new WebSocketMessage(Type.TEXT, buffer);
117                }
118                else if (Type.BINARY.equals(type)) {
119                        DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
120                        return new WebSocketMessage(Type.BINARY, buffer);
121                }
122                else if (Type.PONG.equals(type)) {
123                        DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
124                        return new WebSocketMessage(Type.PONG, buffer);
125                }
126                else {
127                        throw new IllegalArgumentException("Unexpected message type: " + message);
128                }
129        }
130
131        @OnWebSocketClose
132        public void onWebSocketClose(int statusCode, String reason) {
133                if (this.delegateSession != null) {
134                        this.delegateSession.handleClose(new CloseStatus(statusCode, reason));
135                }
136        }
137
138        @OnWebSocketError
139        public void onWebSocketError(Throwable cause) {
140                if (this.delegateSession != null) {
141                        this.delegateSession.handleError(cause);
142                }
143        }
144
145}