001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.nio.ByteBuffer; 020import java.nio.charset.StandardCharsets; 021import java.util.function.Function; 022 023import org.eclipse.jetty.websocket.api.Session; 024import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; 025import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; 026import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; 027import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; 028import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; 029import org.eclipse.jetty.websocket.api.annotations.WebSocket; 030import org.eclipse.jetty.websocket.api.extensions.Frame; 031import org.eclipse.jetty.websocket.common.OpCode; 032 033import org.springframework.core.io.buffer.DataBuffer; 034import org.springframework.lang.Nullable; 035import org.springframework.util.Assert; 036import org.springframework.web.reactive.socket.CloseStatus; 037import org.springframework.web.reactive.socket.WebSocketHandler; 038import org.springframework.web.reactive.socket.WebSocketMessage; 039import org.springframework.web.reactive.socket.WebSocketMessage.Type; 040import org.springframework.web.reactive.socket.WebSocketSession; 041 042/** 043 * Jetty {@link WebSocket @WebSocket} handler that delegates events to a 044 * reactive {@link WebSocketHandler} and its session. 045 * 046 * @author Violeta Georgieva 047 * @author Rossen Stoyanchev 048 * @since 5.0 049 */ 050@WebSocket 051public class JettyWebSocketHandlerAdapter { 052 053 private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); 054 055 056 private final WebSocketHandler delegateHandler; 057 058 private final Function<Session, JettyWebSocketSession> sessionFactory; 059 060 @Nullable 061 private JettyWebSocketSession delegateSession; 062 063 064 public JettyWebSocketHandlerAdapter(WebSocketHandler handler, 065 Function<Session, JettyWebSocketSession> sessionFactory) { 066 067 Assert.notNull(handler, "WebSocketHandler is required"); 068 Assert.notNull(sessionFactory, "'sessionFactory' is required"); 069 this.delegateHandler = handler; 070 this.sessionFactory = sessionFactory; 071 } 072 073 074 @OnWebSocketConnect 075 public void onWebSocketConnect(Session session) { 076 this.delegateSession = this.sessionFactory.apply(session); 077 this.delegateHandler.handle(this.delegateSession) 078 .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]") 079 .subscribe(this.delegateSession); 080 } 081 082 @OnWebSocketMessage 083 public void onWebSocketText(String message) { 084 if (this.delegateSession != null) { 085 WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message); 086 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 087 } 088 } 089 090 @OnWebSocketMessage 091 public void onWebSocketBinary(byte[] message, int offset, int length) { 092 if (this.delegateSession != null) { 093 ByteBuffer buffer = ByteBuffer.wrap(message, offset, length); 094 WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer); 095 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 096 } 097 } 098 099 @OnWebSocketFrame 100 public void onWebSocketFrame(Frame frame) { 101 if (this.delegateSession != null) { 102 if (OpCode.PONG == frame.getOpCode()) { 103 ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD); 104 WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer); 105 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 106 } 107 } 108 } 109 110 private <T> WebSocketMessage toMessage(Type type, T message) { 111 WebSocketSession session = this.delegateSession; 112 Assert.state(session != null, "Cannot create message without a session"); 113 if (Type.TEXT.equals(type)) { 114 byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); 115 DataBuffer buffer = session.bufferFactory().wrap(bytes); 116 return new WebSocketMessage(Type.TEXT, buffer); 117 } 118 else if (Type.BINARY.equals(type)) { 119 DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message); 120 return new WebSocketMessage(Type.BINARY, buffer); 121 } 122 else if (Type.PONG.equals(type)) { 123 DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message); 124 return new WebSocketMessage(Type.PONG, buffer); 125 } 126 else { 127 throw new IllegalArgumentException("Unexpected message type: " + message); 128 } 129 } 130 131 @OnWebSocketClose 132 public void onWebSocketClose(int statusCode, String reason) { 133 if (this.delegateSession != null) { 134 this.delegateSession.handleClose(new CloseStatus(statusCode, reason)); 135 } 136 } 137 138 @OnWebSocketError 139 public void onWebSocketError(Throwable cause) { 140 if (this.delegateSession != null) { 141 this.delegateSession.handleError(cause); 142 } 143 } 144 145}