001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.tcp.reactor;
018
019import io.netty.buffer.ByteBuf;
020import reactor.core.publisher.DirectProcessor;
021import reactor.core.publisher.Mono;
022import reactor.netty.NettyInbound;
023import reactor.netty.NettyOutbound;
024
025import org.springframework.messaging.Message;
026import org.springframework.messaging.tcp.TcpConnection;
027import org.springframework.util.concurrent.ListenableFuture;
028import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
029
030/**
031 * Reactor Netty based implementation of {@link TcpConnection}.
032 *
033 * @author Rossen Stoyanchev
034 * @since 5.0
035 * @param <P> the type of payload for outbound messages
036 */
037public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
038
039        private final NettyInbound inbound;
040
041        private final NettyOutbound outbound;
042
043        private final ReactorNettyCodec<P> codec;
044
045        private final DirectProcessor<Void> closeProcessor;
046
047
048        public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
049                        ReactorNettyCodec<P> codec, DirectProcessor<Void> closeProcessor) {
050
051                this.inbound = inbound;
052                this.outbound = outbound;
053                this.codec = codec;
054                this.closeProcessor = closeProcessor;
055        }
056
057
058        @Override
059        public ListenableFuture<Void> send(Message<P> message) {
060                ByteBuf byteBuf = this.outbound.alloc().buffer();
061                this.codec.encode(message, byteBuf);
062                Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then();
063                return new MonoToListenableFutureAdapter<>(sendCompletion);
064        }
065
066        @Override
067        @SuppressWarnings("deprecation")
068        public void onReadInactivity(Runnable runnable, long inactivityDuration) {
069                this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
070        }
071
072        @Override
073        @SuppressWarnings("deprecation")
074        public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
075                this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
076        }
077
078        @Override
079        public void close() {
080                this.closeProcessor.onComplete();
081        }
082
083}