001/*
002 * Copyright 2002-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.nio.ByteBuffer;
020import java.util.Collections;
021import java.util.List;
022import java.util.Queue;
023import java.util.concurrent.LinkedBlockingQueue;
024
025import org.springframework.messaging.Message;
026import org.springframework.util.Assert;
027import org.springframework.util.LinkedMultiValueMap;
028import org.springframework.util.MultiValueMap;
029
030/**
031 * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
032 * that buffers content remaining in the input ByteBuffer after the parent
033 * class has read all (complete) STOMP frames from it. The remaining content
034 * represents an incomplete STOMP frame. When called repeatedly with additional
035 * data, the decode method returns one or more messages or, if there is not
036 * enough data still, continues to buffer.
037 *
038 * <p>A single instance of this decoder can be invoked repeatedly to read all
039 * messages from a single stream (e.g. WebSocket session) as long as decoding
040 * does not fail. If there is an exception, StompDecoder instance should not
041 * be used any more as its internal state is not guaranteed to be consistent.
042 * It is expected that the underlying session is closed at that point.
043 *
044 * @author Rossen Stoyanchev
045 * @since 4.0.3
046 * @see StompDecoder
047 */
048public class BufferingStompDecoder {
049
050        private final StompDecoder stompDecoder;
051
052        private final int bufferSizeLimit;
053
054        private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<ByteBuffer>();
055
056        private volatile Integer expectedContentLength;
057
058
059        /**
060         * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}.
061         * @param stompDecoder the target decoder to wrap
062         * @param bufferSizeLimit the buffer size limit
063         */
064        public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) {
065                Assert.notNull(stompDecoder, "StompDecoder is required");
066                Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
067                this.stompDecoder = stompDecoder;
068                this.bufferSizeLimit = bufferSizeLimit;
069        }
070
071
072        /**
073         * Return the wrapped {@link StompDecoder}.
074         */
075        public final StompDecoder getStompDecoder() {
076                return this.stompDecoder;
077        }
078
079        /**
080         * Return the configured buffer size limit.
081         */
082        public final int getBufferSizeLimit() {
083                return this.bufferSizeLimit;
084        }
085
086
087        /**
088         * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
089         * list of {@link Message}s.
090         * <p>If there was enough data to parse a "content-length" header, then the
091         * value is used to determine how much more data is needed before a new
092         * attempt to decode is made.
093         * <p>If there was not enough data to parse the "content-length", or if there
094         * is "content-length" header, every subsequent call to decode attempts to
095         * parse again with all available data. Therefore the presence of a "content-length"
096         * header helps to optimize the decoding of large messages.
097         * @param newBuffer a buffer containing new data to decode
098         * @return decoded messages or an empty list
099         * @throws StompConversionException raised in case of decoding issues
100         */
101        public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
102                this.chunks.add(newBuffer);
103                checkBufferLimits();
104
105                if (this.expectedContentLength != null && getBufferSize() < this.expectedContentLength) {
106                        return Collections.<Message<byte[]>>emptyList();
107                }
108
109                ByteBuffer bufferToDecode = assembleChunksAndReset();
110                MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
111                List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers);
112
113                if (bufferToDecode.hasRemaining()) {
114                        this.chunks.add(bufferToDecode);
115                        this.expectedContentLength = StompHeaderAccessor.getContentLength(headers);
116                }
117
118                return messages;
119        }
120
121        private ByteBuffer assembleChunksAndReset() {
122                ByteBuffer result;
123                if (this.chunks.size() == 1) {
124                        result = this.chunks.remove();
125                }
126                else {
127                        result = ByteBuffer.allocate(getBufferSize());
128                        for (ByteBuffer partial : this.chunks) {
129                                result.put(partial);
130                        }
131                        result.flip();
132                }
133                this.chunks.clear();
134                this.expectedContentLength = null;
135                return result;
136        }
137
138        private void checkBufferLimits() {
139                if (this.expectedContentLength != null) {
140                        if (this.expectedContentLength > this.bufferSizeLimit) {
141                                throw new StompConversionException(
142                                                "STOMP 'content-length' header value " + this.expectedContentLength +
143                                                "  exceeds configured buffer size limit " + this.bufferSizeLimit);
144                        }
145                }
146                if (getBufferSize() > this.bufferSizeLimit) {
147                        throw new StompConversionException("The configured STOMP buffer size limit of " +
148                                        this.bufferSizeLimit + " bytes has been exceeded");
149                }
150        }
151
152        /**
153         * Calculate the current buffer size.
154         */
155        public int getBufferSize() {
156                int size = 0;
157                for (ByteBuffer buffer : this.chunks) {
158                        size = size + buffer.remaining();
159                }
160                return size;
161        }
162
163        /**
164         * Get the expected content length of the currently buffered, incomplete STOMP frame.
165         */
166        public Integer getExpectedContentLength() {
167                return this.expectedContentLength;
168        }
169
170}