001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.socket.adapter; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021import java.nio.charset.StandardCharsets; 022 023import org.eclipse.jetty.websocket.api.Session; 024import org.eclipse.jetty.websocket.api.SuspendToken; 025import org.eclipse.jetty.websocket.api.WriteCallback; 026import reactor.core.publisher.Mono; 027import reactor.core.publisher.MonoProcessor; 028 029import org.springframework.core.io.buffer.DataBufferFactory; 030import org.springframework.lang.Nullable; 031import org.springframework.util.Assert; 032import org.springframework.util.ObjectUtils; 033import org.springframework.web.reactive.socket.CloseStatus; 034import org.springframework.web.reactive.socket.HandshakeInfo; 035import org.springframework.web.reactive.socket.WebSocketMessage; 036import org.springframework.web.reactive.socket.WebSocketSession; 037 038/** 039 * Spring {@link WebSocketSession} implementation that adapts to a Jetty 040 * WebSocket {@link org.eclipse.jetty.websocket.api.Session}. 041 * 042 * @author Violeta Georgieva 043 * @author Rossen Stoyanchev 044 * @since 5.0 045 */ 046public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> { 047 048 @Nullable 049 private volatile SuspendToken suspendToken; 050 051 052 public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { 053 this(session, info, factory, null); 054 } 055 056 public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, 057 @Nullable MonoProcessor<Void> completionMono) { 058 059 super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); 060 // TODO: suspend causes failures if invoked at this stage 061 // suspendReceiving(); 062 } 063 064 065 @Override 066 protected boolean canSuspendReceiving() { 067 return true; 068 } 069 070 @Override 071 protected void suspendReceiving() { 072 Assert.state(this.suspendToken == null, "Already suspended"); 073 this.suspendToken = getDelegate().suspend(); 074 } 075 076 @Override 077 protected void resumeReceiving() { 078 SuspendToken tokenToUse = this.suspendToken; 079 this.suspendToken = null; 080 if (tokenToUse != null) { 081 tokenToUse.resume(); 082 } 083 } 084 085 @Override 086 protected boolean sendMessage(WebSocketMessage message) throws IOException { 087 ByteBuffer buffer = message.getPayload().asByteBuffer(); 088 if (WebSocketMessage.Type.TEXT.equals(message.getType())) { 089 getSendProcessor().setReadyToSend(false); 090 String text = new String(buffer.array(), StandardCharsets.UTF_8); 091 getDelegate().getRemote().sendString(text, new SendProcessorCallback()); 092 } 093 else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { 094 getSendProcessor().setReadyToSend(false); 095 getDelegate().getRemote().sendBytes(buffer, new SendProcessorCallback()); 096 } 097 else if (WebSocketMessage.Type.PING.equals(message.getType())) { 098 getDelegate().getRemote().sendPing(buffer); 099 } 100 else if (WebSocketMessage.Type.PONG.equals(message.getType())) { 101 getDelegate().getRemote().sendPong(buffer); 102 } 103 else { 104 throw new IllegalArgumentException("Unexpected message type: " + message.getType()); 105 } 106 return true; 107 } 108 109 @Override 110 public Mono<Void> close(CloseStatus status) { 111 getDelegate().close(status.getCode(), status.getReason()); 112 return Mono.empty(); 113 } 114 115 116 private final class SendProcessorCallback implements WriteCallback { 117 118 @Override 119 public void writeFailed(Throwable x) { 120 getSendProcessor().cancel(); 121 getSendProcessor().onError(x); 122 } 123 124 @Override 125 public void writeSuccess() { 126 getSendProcessor().setReadyToSend(true); 127 getSendProcessor().onWritePossible(); 128 } 129 130 } 131 132}