001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.nio.ByteBuffer;
020
021import reactor.fn.Consumer;
022import reactor.fn.Function;
023import reactor.io.buffer.Buffer;
024import reactor.io.codec.Codec;
025
026import org.springframework.messaging.Message;
027import org.springframework.util.Assert;
028
029/**
030 * A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
031 *
032 * @author Andy Wilkinson
033 * @author Rossen Stoyanchev
034 * @since 4.0
035 */
036public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
037
038        private final Function<Message<byte[]>, Buffer> encodingFunction;
039
040        private final StompDecoder stompDecoder;
041
042
043        public Reactor2StompCodec() {
044                this(new StompEncoder(), new StompDecoder());
045        }
046
047        public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
048                Assert.notNull(encoder, "StompEncoder is required");
049                Assert.notNull(decoder, "StompDecoder is required");
050                this.encodingFunction = new EncodingFunction(encoder);
051                this.stompDecoder = decoder;
052        }
053
054
055        @Override
056        public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
057                return new DecodingFunction(this.stompDecoder, messageConsumer);
058        }
059
060        @Override
061        public Function<Message<byte[]>, Buffer> encoder() {
062                return this.encodingFunction;
063        }
064
065        @Override
066        public Buffer apply(Message<byte[]> message) {
067                return this.encodingFunction.apply(message);
068        }
069
070
071        private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
072
073                private final StompEncoder encoder;
074
075                public EncodingFunction(StompEncoder encoder) {
076                        this.encoder = encoder;
077                }
078
079                @Override
080                public Buffer apply(Message<byte[]> message) {
081                        byte[] bytes = this.encoder.encode(message);
082                        return new Buffer(ByteBuffer.wrap(bytes));
083                }
084        }
085
086
087        private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
088
089                private final StompDecoder decoder;
090
091                private final Consumer<Message<byte[]>> messageConsumer;
092
093                public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
094                        this.decoder = decoder;
095                        this.messageConsumer = next;
096                }
097
098                @Override
099                public Message<byte[]> apply(Buffer buffer) {
100                        for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) {
101                                this.messageConsumer.accept(message);
102                        }
103                        return null;
104                }
105        }
106
107}