001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.adapter;
018
019import java.io.IOException;
020import java.nio.ByteBuffer;
021import java.nio.charset.StandardCharsets;
022
023import org.eclipse.jetty.websocket.api.Session;
024import org.eclipse.jetty.websocket.api.SuspendToken;
025import org.eclipse.jetty.websocket.api.WriteCallback;
026import reactor.core.publisher.Mono;
027import reactor.core.publisher.MonoProcessor;
028
029import org.springframework.core.io.buffer.DataBufferFactory;
030import org.springframework.lang.Nullable;
031import org.springframework.util.Assert;
032import org.springframework.util.ObjectUtils;
033import org.springframework.web.reactive.socket.CloseStatus;
034import org.springframework.web.reactive.socket.HandshakeInfo;
035import org.springframework.web.reactive.socket.WebSocketMessage;
036import org.springframework.web.reactive.socket.WebSocketSession;
037
038/**
039 * Spring {@link WebSocketSession} implementation that adapts to a Jetty
040 * WebSocket {@link org.eclipse.jetty.websocket.api.Session}.
041 *
042 * @author Violeta Georgieva
043 * @author Rossen Stoyanchev
044 * @since 5.0
045 */
046public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
047
048        @Nullable
049        private volatile SuspendToken suspendToken;
050
051
052        public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
053                this(session, info, factory, null);
054        }
055
056        public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
057                        @Nullable MonoProcessor<Void> completionMono) {
058
059                super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
060                // TODO: suspend causes failures if invoked at this stage
061                // suspendReceiving();
062        }
063
064
065        @Override
066        protected boolean canSuspendReceiving() {
067                return true;
068        }
069
070        @Override
071        protected void suspendReceiving() {
072                Assert.state(this.suspendToken == null, "Already suspended");
073                this.suspendToken = getDelegate().suspend();
074        }
075
076        @Override
077        protected void resumeReceiving() {
078                SuspendToken tokenToUse = this.suspendToken;
079                this.suspendToken = null;
080                if (tokenToUse != null) {
081                        tokenToUse.resume();
082                }
083        }
084
085        @Override
086        protected boolean sendMessage(WebSocketMessage message) throws IOException {
087                ByteBuffer buffer = message.getPayload().asByteBuffer();
088                if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
089                        getSendProcessor().setReadyToSend(false);
090                        String text = new String(buffer.array(), StandardCharsets.UTF_8);
091                        getDelegate().getRemote().sendString(text, new SendProcessorCallback());
092                }
093                else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
094                        getSendProcessor().setReadyToSend(false);
095                        getDelegate().getRemote().sendBytes(buffer, new SendProcessorCallback());
096                }
097                else if (WebSocketMessage.Type.PING.equals(message.getType())) {
098                        getDelegate().getRemote().sendPing(buffer);
099                }
100                else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
101                        getDelegate().getRemote().sendPong(buffer);
102                }
103                else {
104                        throw new IllegalArgumentException("Unexpected message type: " + message.getType());
105                }
106                return true;
107        }
108
109        @Override
110        public Mono<Void> close(CloseStatus status) {
111                getDelegate().close(status.getCode(), status.getReason());
112                return Mono.empty();
113        }
114
115
116        private final class SendProcessorCallback implements WriteCallback {
117
118                @Override
119                public void writeFailed(Throwable x) {
120                        getSendProcessor().cancel();
121                        getSendProcessor().onError(x);
122                }
123
124                @Override
125                public void writeSuccess() {
126                        getSendProcessor().setReadyToSend(true);
127                        getSendProcessor().onWritePossible();
128                }
129
130        }
131
132}