001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.reactive.socket.adapter;
018
019import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
020
021import javax.websocket.Session;
022
023import org.apache.tomcat.websocket.WsSession;
024import reactor.core.publisher.MonoProcessor;
025
026import org.springframework.core.io.buffer.DataBufferFactory;
027import org.springframework.web.reactive.socket.HandshakeInfo;
028import org.springframework.web.reactive.socket.WebSocketSession;
029
030/**
031 * Spring {@link WebSocketSession} adapter for Tomcat's
032 * {@link javax.websocket.Session}.
033 *
034 * @author Violeta Georgieva
035 * @since 5.0
036 */
037public class TomcatWebSocketSession extends StandardWebSocketSession {
038
039        private static final AtomicIntegerFieldUpdater<TomcatWebSocketSession> SUSPENDED =
040                        AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended");
041
042        @SuppressWarnings("unused")
043        private volatile int suspended;
044
045
046        public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
047                super(session, info, factory);
048        }
049
050        public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
051                        MonoProcessor<Void> completionMono) {
052
053                super(session, info, factory, completionMono);
054                suspendReceiving();
055        }
056
057
058        @Override
059        protected boolean canSuspendReceiving() {
060                return true;
061        }
062
063        @Override
064        protected void suspendReceiving() {
065                if (SUSPENDED.compareAndSet(this, 0, 1)) {
066                        ((WsSession) getDelegate()).suspend();
067                }
068        }
069
070        @Override
071        protected void resumeReceiving() {
072                if (SUSPENDED.compareAndSet(this, 1, 0)) {
073                        ((WsSession) getDelegate()).resume();
074                }
075        }
076
077}