001/* 002 * Copyright 2002-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.nio.ByteBuffer; 020 021import reactor.fn.Consumer; 022import reactor.fn.Function; 023import reactor.io.buffer.Buffer; 024import reactor.io.codec.Codec; 025 026import org.springframework.messaging.Message; 027import org.springframework.util.Assert; 028 029/** 030 * A Reactor TCP {@link Codec} for sending and receiving STOMP messages. 031 * 032 * @author Andy Wilkinson 033 * @author Rossen Stoyanchev 034 * @since 4.0 035 */ 036public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> { 037 038 private final Function<Message<byte[]>, Buffer> encodingFunction; 039 040 private final StompDecoder stompDecoder; 041 042 043 public Reactor2StompCodec() { 044 this(new StompEncoder(), new StompDecoder()); 045 } 046 047 public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { 048 Assert.notNull(encoder, "StompEncoder is required"); 049 Assert.notNull(decoder, "StompDecoder is required"); 050 this.encodingFunction = new EncodingFunction(encoder); 051 this.stompDecoder = decoder; 052 } 053 054 055 @Override 056 public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) { 057 return new DecodingFunction(this.stompDecoder, messageConsumer); 058 } 059 060 @Override 061 public Function<Message<byte[]>, Buffer> encoder() { 062 return this.encodingFunction; 063 } 064 065 @Override 066 public Buffer apply(Message<byte[]> message) { 067 return this.encodingFunction.apply(message); 068 } 069 070 071 private static class EncodingFunction implements Function<Message<byte[]>, Buffer> { 072 073 private final StompEncoder encoder; 074 075 public EncodingFunction(StompEncoder encoder) { 076 this.encoder = encoder; 077 } 078 079 @Override 080 public Buffer apply(Message<byte[]> message) { 081 byte[] bytes = this.encoder.encode(message); 082 return new Buffer(ByteBuffer.wrap(bytes)); 083 } 084 } 085 086 087 private static class DecodingFunction implements Function<Buffer, Message<byte[]>> { 088 089 private final StompDecoder decoder; 090 091 private final Consumer<Message<byte[]>> messageConsumer; 092 093 public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) { 094 this.decoder = decoder; 095 this.messageConsumer = next; 096 } 097 098 @Override 099 public Message<byte[]> apply(Buffer buffer) { 100 for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) { 101 this.messageConsumer.accept(message); 102 } 103 return null; 104 } 105 } 106 107}