001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021import java.nio.charset.StandardCharsets; 022 023import javax.websocket.CloseReason; 024import javax.websocket.CloseReason.CloseCodes; 025import javax.websocket.SendHandler; 026import javax.websocket.SendResult; 027import javax.websocket.Session; 028 029import reactor.core.publisher.Mono; 030import reactor.core.publisher.MonoProcessor; 031 032import org.springframework.core.io.buffer.DataBufferFactory; 033import org.springframework.lang.Nullable; 034import org.springframework.web.reactive.socket.CloseStatus; 035import org.springframework.web.reactive.socket.HandshakeInfo; 036import org.springframework.web.reactive.socket.WebSocketMessage; 037import org.springframework.web.reactive.socket.WebSocketSession; 038 039/** 040 * Spring {@link WebSocketSession} adapter for a standard Java (JSR 356) 041 * {@link javax.websocket.Session}. 042 * 043 * @author Violeta Georgieva 044 * @author Rossen Stoyanchev 045 * @since 5.0 046 */ 047public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> { 048 049 public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { 050 this(session, info, factory, null); 051 } 052 053 public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, 054 @Nullable MonoProcessor<Void> completionMono) { 055 056 super(session, session.getId(), info, factory, completionMono); 057 } 058 059 060 @Override 061 protected boolean canSuspendReceiving() { 062 return false; 063 } 064 065 @Override 066 protected void suspendReceiving() { 067 // no-op 068 } 069 070 @Override 071 protected void resumeReceiving() { 072 // no-op 073 } 074 075 @Override 076 protected boolean sendMessage(WebSocketMessage message) throws IOException { 077 ByteBuffer buffer = message.getPayload().asByteBuffer(); 078 if (WebSocketMessage.Type.TEXT.equals(message.getType())) { 079 getSendProcessor().setReadyToSend(false); 080 String text = new String(buffer.array(), StandardCharsets.UTF_8); 081 getDelegate().getAsyncRemote().sendText(text, new SendProcessorCallback()); 082 } 083 else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { 084 getSendProcessor().setReadyToSend(false); 085 getDelegate().getAsyncRemote().sendBinary(buffer, new SendProcessorCallback()); 086 } 087 else if (WebSocketMessage.Type.PING.equals(message.getType())) { 088 getDelegate().getAsyncRemote().sendPing(buffer); 089 } 090 else if (WebSocketMessage.Type.PONG.equals(message.getType())) { 091 getDelegate().getAsyncRemote().sendPong(buffer); 092 } 093 else { 094 throw new IllegalArgumentException("Unexpected message type: " + message.getType()); 095 } 096 return true; 097 } 098 099 @Override 100 public Mono<Void> close(CloseStatus status) { 101 try { 102 CloseReason.CloseCode code = CloseCodes.getCloseCode(status.getCode()); 103 getDelegate().close(new CloseReason(code, status.getReason())); 104 } 105 catch (IOException ex) { 106 return Mono.error(ex); 107 } 108 return Mono.empty(); 109 } 110 111 112 private final class SendProcessorCallback implements SendHandler { 113 114 @Override 115 public void onResult(SendResult result) { 116 if (result.isOK()) { 117 getSendProcessor().setReadyToSend(true); 118 getSendProcessor().onWritePossible(); 119 } 120 else { 121 getSendProcessor().cancel(); 122 getSendProcessor().onError(result.getException()); 123 } 124 } 125 126 } 127 128}