001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.nio.ByteBuffer; 020import java.nio.charset.StandardCharsets; 021import java.util.function.Function; 022 023import javax.websocket.CloseReason; 024import javax.websocket.Endpoint; 025import javax.websocket.EndpointConfig; 026import javax.websocket.PongMessage; 027import javax.websocket.Session; 028 029import org.springframework.core.io.buffer.DataBuffer; 030import org.springframework.lang.Nullable; 031import org.springframework.util.Assert; 032import org.springframework.web.reactive.socket.CloseStatus; 033import org.springframework.web.reactive.socket.WebSocketHandler; 034import org.springframework.web.reactive.socket.WebSocketMessage; 035import org.springframework.web.reactive.socket.WebSocketMessage.Type; 036import org.springframework.web.reactive.socket.WebSocketSession; 037 038/** 039 * Adapter for Java WebSocket API (JSR-356) that delegates events to a reactive 040 * {@link WebSocketHandler} and its session. 041 * 042 * @author Violeta Georgieva 043 * @author Rossen Stoyanchev 044 * @since 5.0 045 */ 046public class StandardWebSocketHandlerAdapter extends Endpoint { 047 048 private final WebSocketHandler delegateHandler; 049 050 private Function<Session, StandardWebSocketSession> sessionFactory; 051 052 @Nullable 053 private StandardWebSocketSession delegateSession; 054 055 056 public StandardWebSocketHandlerAdapter(WebSocketHandler handler, 057 Function<Session, StandardWebSocketSession> sessionFactory) { 058 059 Assert.notNull(handler, "WebSocketHandler is required"); 060 Assert.notNull(sessionFactory, "'sessionFactory' is required"); 061 this.delegateHandler = handler; 062 this.sessionFactory = sessionFactory; 063 } 064 065 066 @Override 067 public void onOpen(Session session, EndpointConfig config) { 068 this.delegateSession = this.sessionFactory.apply(session); 069 Assert.state(this.delegateSession != null, "No delegate session"); 070 071 session.addMessageHandler(String.class, message -> { 072 WebSocketMessage webSocketMessage = toMessage(message); 073 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 074 }); 075 session.addMessageHandler(ByteBuffer.class, message -> { 076 WebSocketMessage webSocketMessage = toMessage(message); 077 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 078 }); 079 session.addMessageHandler(PongMessage.class, message -> { 080 WebSocketMessage webSocketMessage = toMessage(message); 081 this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); 082 }); 083 084 this.delegateHandler.handle(this.delegateSession) 085 .checkpoint(session.getRequestURI() + " [StandardWebSocketHandlerAdapter]") 086 .subscribe(this.delegateSession); 087 } 088 089 private <T> WebSocketMessage toMessage(T message) { 090 WebSocketSession session = this.delegateSession; 091 Assert.state(session != null, "Cannot create message without a session"); 092 if (message instanceof String) { 093 byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); 094 return new WebSocketMessage(Type.TEXT, session.bufferFactory().wrap(bytes)); 095 } 096 else if (message instanceof ByteBuffer) { 097 DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message); 098 return new WebSocketMessage(Type.BINARY, buffer); 099 } 100 else if (message instanceof PongMessage) { 101 DataBuffer buffer = session.bufferFactory().wrap(((PongMessage) message).getApplicationData()); 102 return new WebSocketMessage(Type.PONG, buffer); 103 } 104 else { 105 throw new IllegalArgumentException("Unexpected message type: " + message); 106 } 107 } 108 109 @Override 110 public void onClose(Session session, CloseReason reason) { 111 if (this.delegateSession != null) { 112 int code = reason.getCloseCode().getCode(); 113 this.delegateSession.handleClose(new CloseStatus(code, reason.getReasonPhrase())); 114 } 115 } 116 117 @Override 118 public void onError(Session session, Throwable exception) { 119 if (this.delegateSession != null) { 120 this.delegateSession.handleError(exception); 121 } 122 } 123 124}