001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.tcp.reactor;
018
019import reactor.io.net.ChannelStream;
020import reactor.rx.Promise;
021import reactor.rx.Promises;
022import reactor.rx.Streams;
023
024import org.springframework.messaging.Message;
025import org.springframework.messaging.tcp.TcpConnection;
026import org.springframework.util.concurrent.ListenableFuture;
027
028/**
029 * An implementation of {@link org.springframework.messaging.tcp.TcpConnection
030 * TcpConnection} based on the TCP client support of the Reactor project.
031 *
032 * @author Rossen Stoyanchev
033 * @since 4.2
034 * @param <P> the payload type of messages read or written to the TCP stream.
035 */
036public class Reactor2TcpConnection<P> implements TcpConnection<P> {
037
038        private final ChannelStream<Message<P>, Message<P>> channelStream;
039
040        private final Promise<Void> closePromise;
041
042
043        public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
044                this.channelStream = channelStream;
045                this.closePromise = closePromise;
046        }
047
048
049        @Override
050        public ListenableFuture<Void> send(Message<P> message) {
051                Promise<Void> afterWrite = Promises.prepare();
052                this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite);
053                return new PassThroughPromiseToListenableFutureAdapter<Void>(afterWrite);
054        }
055
056        @Override
057        @SuppressWarnings("deprecation")
058        public void onReadInactivity(Runnable runnable, long inactivityDuration) {
059                this.channelStream.on().readIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
060        }
061
062        @Override
063        @SuppressWarnings("deprecation")
064        public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
065                this.channelStream.on().writeIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
066        }
067
068        @Override
069        public void close() {
070                this.closePromise.onComplete();
071        }
072
073}