001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.nio.ByteBuffer;
020import java.util.Collections;
021import java.util.List;
022import java.util.Queue;
023import java.util.concurrent.LinkedBlockingQueue;
024
025import org.springframework.lang.Nullable;
026import org.springframework.messaging.Message;
027import org.springframework.util.Assert;
028import org.springframework.util.LinkedMultiValueMap;
029import org.springframework.util.MultiValueMap;
030
031/**
032 * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
033 * that buffers content remaining in the input ByteBuffer after the parent
034 * class has read all (complete) STOMP frames from it. The remaining content
035 * represents an incomplete STOMP frame. When called repeatedly with additional
036 * data, the decode method returns one or more messages or, if there is not
037 * enough data still, continues to buffer.
038 *
039 * <p>A single instance of this decoder can be invoked repeatedly to read all
040 * messages from a single stream (e.g. WebSocket session) as long as decoding
041 * does not fail. If there is an exception, StompDecoder instance should not
042 * be used any more as its internal state is not guaranteed to be consistent.
043 * It is expected that the underlying session is closed at that point.
044 *
045 * @author Rossen Stoyanchev
046 * @since 4.0.3
047 * @see StompDecoder
048 */
049public class BufferingStompDecoder {
050
051        private final StompDecoder stompDecoder;
052
053        private final int bufferSizeLimit;
054
055        private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<>();
056
057        @Nullable
058        private volatile Integer expectedContentLength;
059
060
061        /**
062         * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}.
063         * @param stompDecoder the target decoder to wrap
064         * @param bufferSizeLimit the buffer size limit
065         */
066        public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) {
067                Assert.notNull(stompDecoder, "StompDecoder is required");
068                Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
069                this.stompDecoder = stompDecoder;
070                this.bufferSizeLimit = bufferSizeLimit;
071        }
072
073
074        /**
075         * Return the wrapped {@link StompDecoder}.
076         */
077        public final StompDecoder getStompDecoder() {
078                return this.stompDecoder;
079        }
080
081        /**
082         * Return the configured buffer size limit.
083         */
084        public final int getBufferSizeLimit() {
085                return this.bufferSizeLimit;
086        }
087
088
089        /**
090         * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
091         * list of {@link Message Messages}.
092         * <p>If there was enough data to parse a "content-length" header, then the
093         * value is used to determine how much more data is needed before a new
094         * attempt to decode is made.
095         * <p>If there was not enough data to parse the "content-length", or if there
096         * is "content-length" header, every subsequent call to decode attempts to
097         * parse again with all available data. Therefore the presence of a "content-length"
098         * header helps to optimize the decoding of large messages.
099         * @param newBuffer a buffer containing new data to decode
100         * @return decoded messages or an empty list
101         * @throws StompConversionException raised in case of decoding issues
102         */
103        public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
104                this.chunks.add(newBuffer);
105                checkBufferLimits();
106
107                Integer contentLength = this.expectedContentLength;
108                if (contentLength != null && getBufferSize() < contentLength) {
109                        return Collections.emptyList();
110                }
111
112                ByteBuffer bufferToDecode = assembleChunksAndReset();
113                MultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
114                List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers);
115
116                if (bufferToDecode.hasRemaining()) {
117                        this.chunks.add(bufferToDecode);
118                        this.expectedContentLength = StompHeaderAccessor.getContentLength(headers);
119                }
120
121                return messages;
122        }
123
124        private ByteBuffer assembleChunksAndReset() {
125                ByteBuffer result;
126                if (this.chunks.size() == 1) {
127                        result = this.chunks.remove();
128                }
129                else {
130                        result = ByteBuffer.allocate(getBufferSize());
131                        for (ByteBuffer partial : this.chunks) {
132                                result.put(partial);
133                        }
134                        result.flip();
135                }
136                this.chunks.clear();
137                this.expectedContentLength = null;
138                return result;
139        }
140
141        private void checkBufferLimits() {
142                Integer contentLength = this.expectedContentLength;
143                if (contentLength != null && contentLength > this.bufferSizeLimit) {
144                        throw new StompConversionException(
145                                        "STOMP 'content-length' header value " + this.expectedContentLength +
146                                        "  exceeds configured buffer size limit " + this.bufferSizeLimit);
147                }
148                if (getBufferSize() > this.bufferSizeLimit) {
149                        throw new StompConversionException("The configured STOMP buffer size limit of " +
150                                        this.bufferSizeLimit + " bytes has been exceeded");
151                }
152        }
153
154        /**
155         * Calculate the current buffer size.
156         */
157        public int getBufferSize() {
158                int size = 0;
159                for (ByteBuffer buffer : this.chunks) {
160                        size = size + buffer.remaining();
161                }
162                return size;
163        }
164
165        /**
166         * Get the expected content length of the currently buffered, incomplete STOMP frame.
167         */
168        @Nullable
169        public Integer getExpectedContentLength() {
170                return this.expectedContentLength;
171        }
172
173}