001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021import java.nio.charset.StandardCharsets; 022 023import io.undertow.websockets.core.CloseMessage; 024import io.undertow.websockets.core.WebSocketCallback; 025import io.undertow.websockets.core.WebSocketChannel; 026import io.undertow.websockets.core.WebSockets; 027import reactor.core.publisher.Mono; 028import reactor.core.publisher.MonoProcessor; 029 030import org.springframework.core.io.buffer.DataBuffer; 031import org.springframework.core.io.buffer.DataBufferFactory; 032import org.springframework.core.io.buffer.DataBufferUtils; 033import org.springframework.lang.Nullable; 034import org.springframework.util.ObjectUtils; 035import org.springframework.web.reactive.socket.CloseStatus; 036import org.springframework.web.reactive.socket.HandshakeInfo; 037import org.springframework.web.reactive.socket.WebSocketMessage; 038import org.springframework.web.reactive.socket.WebSocketSession; 039 040/** 041 * Spring {@link WebSocketSession} implementation that adapts to an Undertow 042 * {@link io.undertow.websockets.core.WebSocketChannel}. 043 * 044 * @author Violeta Georgieva 045 * @author Rossen Stoyanchev 046 * @since 5.0 047 */ 048public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> { 049 050 public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) { 051 this(channel, info, factory, null); 052 } 053 054 public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, 055 DataBufferFactory factory, @Nullable MonoProcessor<Void> completionMono) { 056 057 super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono); 058 suspendReceiving(); 059 } 060 061 062 @Override 063 protected boolean canSuspendReceiving() { 064 return true; 065 } 066 067 @Override 068 protected void suspendReceiving() { 069 getDelegate().suspendReceives(); 070 } 071 072 @Override 073 protected void resumeReceiving() { 074 getDelegate().resumeReceives(); 075 } 076 077 @Override 078 protected boolean sendMessage(WebSocketMessage message) throws IOException { 079 ByteBuffer buffer = message.getPayload().asByteBuffer(); 080 if (WebSocketMessage.Type.TEXT.equals(message.getType())) { 081 getSendProcessor().setReadyToSend(false); 082 String text = new String(buffer.array(), StandardCharsets.UTF_8); 083 WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload())); 084 } 085 else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { 086 getSendProcessor().setReadyToSend(false); 087 WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload())); 088 } 089 else if (WebSocketMessage.Type.PING.equals(message.getType())) { 090 getSendProcessor().setReadyToSend(false); 091 WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload())); 092 } 093 else if (WebSocketMessage.Type.PONG.equals(message.getType())) { 094 getSendProcessor().setReadyToSend(false); 095 WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload())); 096 } 097 else { 098 throw new IllegalArgumentException("Unexpected message type: " + message.getType()); 099 } 100 return true; 101 } 102 103 @Override 104 public Mono<Void> close(CloseStatus status) { 105 CloseMessage cm = new CloseMessage(status.getCode(), status.getReason()); 106 if (!getDelegate().isCloseFrameSent()) { 107 WebSockets.sendClose(cm, getDelegate(), null); 108 } 109 return Mono.empty(); 110 } 111 112 113 private final class SendProcessorCallback implements WebSocketCallback<Void> { 114 115 private final DataBuffer payload; 116 117 SendProcessorCallback(DataBuffer payload) { 118 this.payload = payload; 119 } 120 121 @Override 122 public void complete(WebSocketChannel channel, Void context) { 123 DataBufferUtils.release(this.payload); 124 getSendProcessor().setReadyToSend(true); 125 getSendProcessor().onWritePossible(); 126 } 127 128 @Override 129 public void onError(WebSocketChannel channel, Void context, Throwable throwable) { 130 DataBufferUtils.release(this.payload); 131 getSendProcessor().cancel(); 132 getSendProcessor().onError(throwable); 133 } 134 } 135 136}