001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.tcp.reactor; 018 019import io.netty.buffer.ByteBuf; 020import reactor.core.publisher.DirectProcessor; 021import reactor.core.publisher.Mono; 022import reactor.netty.NettyInbound; 023import reactor.netty.NettyOutbound; 024 025import org.springframework.messaging.Message; 026import org.springframework.messaging.tcp.TcpConnection; 027import org.springframework.util.concurrent.ListenableFuture; 028import org.springframework.util.concurrent.MonoToListenableFutureAdapter; 029 030/** 031 * Reactor Netty based implementation of {@link TcpConnection}. 032 * 033 * @author Rossen Stoyanchev 034 * @since 5.0 035 * @param <P> the type of payload for outbound messages 036 */ 037public class ReactorNettyTcpConnection<P> implements TcpConnection<P> { 038 039 private final NettyInbound inbound; 040 041 private final NettyOutbound outbound; 042 043 private final ReactorNettyCodec<P> codec; 044 045 private final DirectProcessor<Void> closeProcessor; 046 047 048 public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, 049 ReactorNettyCodec<P> codec, DirectProcessor<Void> closeProcessor) { 050 051 this.inbound = inbound; 052 this.outbound = outbound; 053 this.codec = codec; 054 this.closeProcessor = closeProcessor; 055 } 056 057 058 @Override 059 public ListenableFuture<Void> send(Message<P> message) { 060 ByteBuf byteBuf = this.outbound.alloc().buffer(); 061 this.codec.encode(message, byteBuf); 062 Mono<Void> sendCompletion = this.outbound.send(Mono.just(byteBuf)).then(); 063 return new MonoToListenableFutureAdapter<>(sendCompletion); 064 } 065 066 @Override 067 @SuppressWarnings("deprecation") 068 public void onReadInactivity(Runnable runnable, long inactivityDuration) { 069 this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable)); 070 } 071 072 @Override 073 @SuppressWarnings("deprecation") 074 public void onWriteInactivity(Runnable runnable, long inactivityDuration) { 075 this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable)); 076 } 077 078 @Override 079 public void close() { 080 this.closeProcessor.onComplete(); 081 } 082 083}